Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/ceph/mds_client.c
49833 views
1
// SPDX-License-Identifier: GPL-2.0
2
#include <linux/ceph/ceph_debug.h>
3
4
#include <linux/fs.h>
5
#include <linux/wait.h>
6
#include <linux/slab.h>
7
#include <linux/gfp.h>
8
#include <linux/sched.h>
9
#include <linux/debugfs.h>
10
#include <linux/seq_file.h>
11
#include <linux/ratelimit.h>
12
#include <linux/bits.h>
13
#include <linux/ktime.h>
14
#include <linux/bitmap.h>
15
#include <linux/mnt_idmapping.h>
16
17
#include "super.h"
18
#include "mds_client.h"
19
#include "crypto.h"
20
21
#include <linux/ceph/ceph_features.h>
22
#include <linux/ceph/messenger.h>
23
#include <linux/ceph/decode.h>
24
#include <linux/ceph/pagelist.h>
25
#include <linux/ceph/auth.h>
26
#include <linux/ceph/debugfs.h>
27
#include <trace/events/ceph.h>
28
29
#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
30
31
/*
32
* A cluster of MDS (metadata server) daemons is responsible for
33
* managing the file system namespace (the directory hierarchy and
34
* inodes) and for coordinating shared access to storage. Metadata is
35
* partitioning hierarchically across a number of servers, and that
36
* partition varies over time as the cluster adjusts the distribution
37
* in order to balance load.
38
*
39
* The MDS client is primarily responsible to managing synchronous
40
* metadata requests for operations like open, unlink, and so forth.
41
* If there is a MDS failure, we find out about it when we (possibly
42
* request and) receive a new MDS map, and can resubmit affected
43
* requests.
44
*
45
* For the most part, though, we take advantage of a lossless
46
* communications channel to the MDS, and do not need to worry about
47
* timing out or resubmitting requests.
48
*
49
* We maintain a stateful "session" with each MDS we interact with.
50
* Within each session, we sent periodic heartbeat messages to ensure
51
* any capabilities or leases we have been issues remain valid. If
52
* the session times out and goes stale, our leases and capabilities
53
* are no longer valid.
54
*/
55
56
struct ceph_reconnect_state {
57
struct ceph_mds_session *session;
58
int nr_caps, nr_realms;
59
struct ceph_pagelist *pagelist;
60
unsigned msg_version;
61
bool allow_multi;
62
};
63
64
static void __wake_requests(struct ceph_mds_client *mdsc,
65
struct list_head *head);
66
static void ceph_cap_release_work(struct work_struct *work);
67
static void ceph_cap_reclaim_work(struct work_struct *work);
68
69
static const struct ceph_connection_operations mds_con_ops;
70
71
72
/*
73
* mds reply parsing
74
*/
75
76
static int parse_reply_info_quota(void **p, void *end,
77
struct ceph_mds_reply_info_in *info)
78
{
79
u8 struct_v, struct_compat;
80
u32 struct_len;
81
82
ceph_decode_8_safe(p, end, struct_v, bad);
83
ceph_decode_8_safe(p, end, struct_compat, bad);
84
/* struct_v is expected to be >= 1. we only
85
* understand encoding with struct_compat == 1. */
86
if (!struct_v || struct_compat != 1)
87
goto bad;
88
ceph_decode_32_safe(p, end, struct_len, bad);
89
ceph_decode_need(p, end, struct_len, bad);
90
end = *p + struct_len;
91
ceph_decode_64_safe(p, end, info->max_bytes, bad);
92
ceph_decode_64_safe(p, end, info->max_files, bad);
93
*p = end;
94
return 0;
95
bad:
96
return -EIO;
97
}
98
99
/*
100
* parse individual inode info
101
*/
102
static int parse_reply_info_in(void **p, void *end,
103
struct ceph_mds_reply_info_in *info,
104
u64 features)
105
{
106
int err = 0;
107
u8 struct_v = 0;
108
109
if (features == (u64)-1) {
110
u32 struct_len;
111
u8 struct_compat;
112
ceph_decode_8_safe(p, end, struct_v, bad);
113
ceph_decode_8_safe(p, end, struct_compat, bad);
114
/* struct_v is expected to be >= 1. we only understand
115
* encoding with struct_compat == 1. */
116
if (!struct_v || struct_compat != 1)
117
goto bad;
118
ceph_decode_32_safe(p, end, struct_len, bad);
119
ceph_decode_need(p, end, struct_len, bad);
120
end = *p + struct_len;
121
}
122
123
ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
124
info->in = *p;
125
*p += sizeof(struct ceph_mds_reply_inode) +
126
sizeof(*info->in->fragtree.splits) *
127
le32_to_cpu(info->in->fragtree.nsplits);
128
129
ceph_decode_32_safe(p, end, info->symlink_len, bad);
130
ceph_decode_need(p, end, info->symlink_len, bad);
131
info->symlink = *p;
132
*p += info->symlink_len;
133
134
ceph_decode_copy_safe(p, end, &info->dir_layout,
135
sizeof(info->dir_layout), bad);
136
ceph_decode_32_safe(p, end, info->xattr_len, bad);
137
ceph_decode_need(p, end, info->xattr_len, bad);
138
info->xattr_data = *p;
139
*p += info->xattr_len;
140
141
if (features == (u64)-1) {
142
/* inline data */
143
ceph_decode_64_safe(p, end, info->inline_version, bad);
144
ceph_decode_32_safe(p, end, info->inline_len, bad);
145
ceph_decode_need(p, end, info->inline_len, bad);
146
info->inline_data = *p;
147
*p += info->inline_len;
148
/* quota */
149
err = parse_reply_info_quota(p, end, info);
150
if (err < 0)
151
goto out_bad;
152
/* pool namespace */
153
ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
154
if (info->pool_ns_len > 0) {
155
ceph_decode_need(p, end, info->pool_ns_len, bad);
156
info->pool_ns_data = *p;
157
*p += info->pool_ns_len;
158
}
159
160
/* btime */
161
ceph_decode_need(p, end, sizeof(info->btime), bad);
162
ceph_decode_copy(p, &info->btime, sizeof(info->btime));
163
164
/* change attribute */
165
ceph_decode_64_safe(p, end, info->change_attr, bad);
166
167
/* dir pin */
168
if (struct_v >= 2) {
169
ceph_decode_32_safe(p, end, info->dir_pin, bad);
170
} else {
171
info->dir_pin = -ENODATA;
172
}
173
174
/* snapshot birth time, remains zero for v<=2 */
175
if (struct_v >= 3) {
176
ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
177
ceph_decode_copy(p, &info->snap_btime,
178
sizeof(info->snap_btime));
179
} else {
180
memset(&info->snap_btime, 0, sizeof(info->snap_btime));
181
}
182
183
/* snapshot count, remains zero for v<=3 */
184
if (struct_v >= 4) {
185
ceph_decode_64_safe(p, end, info->rsnaps, bad);
186
} else {
187
info->rsnaps = 0;
188
}
189
190
if (struct_v >= 5) {
191
u32 alen;
192
193
ceph_decode_32_safe(p, end, alen, bad);
194
195
while (alen--) {
196
u32 len;
197
198
/* key */
199
ceph_decode_32_safe(p, end, len, bad);
200
ceph_decode_skip_n(p, end, len, bad);
201
/* value */
202
ceph_decode_32_safe(p, end, len, bad);
203
ceph_decode_skip_n(p, end, len, bad);
204
}
205
}
206
207
/* fscrypt flag -- ignore */
208
if (struct_v >= 6)
209
ceph_decode_skip_8(p, end, bad);
210
211
info->fscrypt_auth = NULL;
212
info->fscrypt_auth_len = 0;
213
info->fscrypt_file = NULL;
214
info->fscrypt_file_len = 0;
215
if (struct_v >= 7) {
216
ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad);
217
if (info->fscrypt_auth_len) {
218
info->fscrypt_auth = kmalloc(info->fscrypt_auth_len,
219
GFP_KERNEL);
220
if (!info->fscrypt_auth)
221
return -ENOMEM;
222
ceph_decode_copy_safe(p, end, info->fscrypt_auth,
223
info->fscrypt_auth_len, bad);
224
}
225
ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad);
226
if (info->fscrypt_file_len) {
227
info->fscrypt_file = kmalloc(info->fscrypt_file_len,
228
GFP_KERNEL);
229
if (!info->fscrypt_file)
230
return -ENOMEM;
231
ceph_decode_copy_safe(p, end, info->fscrypt_file,
232
info->fscrypt_file_len, bad);
233
}
234
}
235
*p = end;
236
} else {
237
/* legacy (unversioned) struct */
238
if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
239
ceph_decode_64_safe(p, end, info->inline_version, bad);
240
ceph_decode_32_safe(p, end, info->inline_len, bad);
241
ceph_decode_need(p, end, info->inline_len, bad);
242
info->inline_data = *p;
243
*p += info->inline_len;
244
} else
245
info->inline_version = CEPH_INLINE_NONE;
246
247
if (features & CEPH_FEATURE_MDS_QUOTA) {
248
err = parse_reply_info_quota(p, end, info);
249
if (err < 0)
250
goto out_bad;
251
} else {
252
info->max_bytes = 0;
253
info->max_files = 0;
254
}
255
256
info->pool_ns_len = 0;
257
info->pool_ns_data = NULL;
258
if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
259
ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
260
if (info->pool_ns_len > 0) {
261
ceph_decode_need(p, end, info->pool_ns_len, bad);
262
info->pool_ns_data = *p;
263
*p += info->pool_ns_len;
264
}
265
}
266
267
if (features & CEPH_FEATURE_FS_BTIME) {
268
ceph_decode_need(p, end, sizeof(info->btime), bad);
269
ceph_decode_copy(p, &info->btime, sizeof(info->btime));
270
ceph_decode_64_safe(p, end, info->change_attr, bad);
271
}
272
273
info->dir_pin = -ENODATA;
274
/* info->snap_btime and info->rsnaps remain zero */
275
}
276
return 0;
277
bad:
278
err = -EIO;
279
out_bad:
280
return err;
281
}
282
283
static int parse_reply_info_dir(void **p, void *end,
284
struct ceph_mds_reply_dirfrag **dirfrag,
285
u64 features)
286
{
287
if (features == (u64)-1) {
288
u8 struct_v, struct_compat;
289
u32 struct_len;
290
ceph_decode_8_safe(p, end, struct_v, bad);
291
ceph_decode_8_safe(p, end, struct_compat, bad);
292
/* struct_v is expected to be >= 1. we only understand
293
* encoding whose struct_compat == 1. */
294
if (!struct_v || struct_compat != 1)
295
goto bad;
296
ceph_decode_32_safe(p, end, struct_len, bad);
297
ceph_decode_need(p, end, struct_len, bad);
298
end = *p + struct_len;
299
}
300
301
ceph_decode_need(p, end, sizeof(**dirfrag), bad);
302
*dirfrag = *p;
303
*p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
304
if (unlikely(*p > end))
305
goto bad;
306
if (features == (u64)-1)
307
*p = end;
308
return 0;
309
bad:
310
return -EIO;
311
}
312
313
static int parse_reply_info_lease(void **p, void *end,
314
struct ceph_mds_reply_lease **lease,
315
u64 features, u32 *altname_len, u8 **altname)
316
{
317
u8 struct_v;
318
u32 struct_len;
319
void *lend;
320
321
if (features == (u64)-1) {
322
u8 struct_compat;
323
324
ceph_decode_8_safe(p, end, struct_v, bad);
325
ceph_decode_8_safe(p, end, struct_compat, bad);
326
327
/* struct_v is expected to be >= 1. we only understand
328
* encoding whose struct_compat == 1. */
329
if (!struct_v || struct_compat != 1)
330
goto bad;
331
332
ceph_decode_32_safe(p, end, struct_len, bad);
333
} else {
334
struct_len = sizeof(**lease);
335
*altname_len = 0;
336
*altname = NULL;
337
}
338
339
lend = *p + struct_len;
340
ceph_decode_need(p, end, struct_len, bad);
341
*lease = *p;
342
*p += sizeof(**lease);
343
344
if (features == (u64)-1) {
345
if (struct_v >= 2) {
346
ceph_decode_32_safe(p, end, *altname_len, bad);
347
ceph_decode_need(p, end, *altname_len, bad);
348
*altname = *p;
349
*p += *altname_len;
350
} else {
351
*altname = NULL;
352
*altname_len = 0;
353
}
354
}
355
*p = lend;
356
return 0;
357
bad:
358
return -EIO;
359
}
360
361
/*
362
* parse a normal reply, which may contain a (dir+)dentry and/or a
363
* target inode.
364
*/
365
static int parse_reply_info_trace(void **p, void *end,
366
struct ceph_mds_reply_info_parsed *info,
367
u64 features)
368
{
369
int err;
370
371
if (info->head->is_dentry) {
372
err = parse_reply_info_in(p, end, &info->diri, features);
373
if (err < 0)
374
goto out_bad;
375
376
err = parse_reply_info_dir(p, end, &info->dirfrag, features);
377
if (err < 0)
378
goto out_bad;
379
380
ceph_decode_32_safe(p, end, info->dname_len, bad);
381
ceph_decode_need(p, end, info->dname_len, bad);
382
info->dname = *p;
383
*p += info->dname_len;
384
385
err = parse_reply_info_lease(p, end, &info->dlease, features,
386
&info->altname_len, &info->altname);
387
if (err < 0)
388
goto out_bad;
389
}
390
391
if (info->head->is_target) {
392
err = parse_reply_info_in(p, end, &info->targeti, features);
393
if (err < 0)
394
goto out_bad;
395
}
396
397
if (unlikely(*p != end))
398
goto bad;
399
return 0;
400
401
bad:
402
err = -EIO;
403
out_bad:
404
pr_err("problem parsing mds trace %d\n", err);
405
return err;
406
}
407
408
/*
409
* parse readdir results
410
*/
411
static int parse_reply_info_readdir(void **p, void *end,
412
struct ceph_mds_request *req,
413
u64 features)
414
{
415
struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
416
struct ceph_client *cl = req->r_mdsc->fsc->client;
417
u32 num, i = 0;
418
int err;
419
420
err = parse_reply_info_dir(p, end, &info->dir_dir, features);
421
if (err < 0)
422
goto out_bad;
423
424
ceph_decode_need(p, end, sizeof(num) + 2, bad);
425
num = ceph_decode_32(p);
426
{
427
u16 flags = ceph_decode_16(p);
428
info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
429
info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
430
info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
431
info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
432
}
433
if (num == 0)
434
goto done;
435
436
BUG_ON(!info->dir_entries);
437
if ((unsigned long)(info->dir_entries + num) >
438
(unsigned long)info->dir_entries + info->dir_buf_size) {
439
pr_err_client(cl, "dir contents are larger than expected\n");
440
WARN_ON(1);
441
goto bad;
442
}
443
444
info->dir_nr = num;
445
while (num) {
446
struct inode *inode = d_inode(req->r_dentry);
447
struct ceph_inode_info *ci = ceph_inode(inode);
448
struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
449
struct fscrypt_str tname = FSTR_INIT(NULL, 0);
450
struct fscrypt_str oname = FSTR_INIT(NULL, 0);
451
struct ceph_fname fname;
452
u32 altname_len, _name_len;
453
u8 *altname, *_name;
454
455
/* dentry */
456
ceph_decode_32_safe(p, end, _name_len, bad);
457
ceph_decode_need(p, end, _name_len, bad);
458
_name = *p;
459
*p += _name_len;
460
doutc(cl, "parsed dir dname '%.*s'\n", _name_len, _name);
461
462
if (info->hash_order)
463
rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
464
_name, _name_len);
465
466
/* dentry lease */
467
err = parse_reply_info_lease(p, end, &rde->lease, features,
468
&altname_len, &altname);
469
if (err)
470
goto out_bad;
471
472
/*
473
* Try to dencrypt the dentry names and update them
474
* in the ceph_mds_reply_dir_entry struct.
475
*/
476
fname.dir = inode;
477
fname.name = _name;
478
fname.name_len = _name_len;
479
fname.ctext = altname;
480
fname.ctext_len = altname_len;
481
/*
482
* The _name_len maybe larger than altname_len, such as
483
* when the human readable name length is in range of
484
* (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE),
485
* then the copy in ceph_fname_to_usr will corrupt the
486
* data if there has no encryption key.
487
*
488
* Just set the no_copy flag and then if there has no
489
* encryption key the oname.name will be assigned to
490
* _name always.
491
*/
492
fname.no_copy = true;
493
if (altname_len == 0) {
494
/*
495
* Set tname to _name, and this will be used
496
* to do the base64_decode in-place. It's
497
* safe because the decoded string should
498
* always be shorter, which is 3/4 of origin
499
* string.
500
*/
501
tname.name = _name;
502
503
/*
504
* Set oname to _name too, and this will be
505
* used to do the dencryption in-place.
506
*/
507
oname.name = _name;
508
oname.len = _name_len;
509
} else {
510
/*
511
* This will do the decryption only in-place
512
* from altname cryptext directly.
513
*/
514
oname.name = altname;
515
oname.len = altname_len;
516
}
517
rde->is_nokey = false;
518
err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey);
519
if (err) {
520
pr_err_client(cl, "unable to decode %.*s, got %d\n",
521
_name_len, _name, err);
522
goto out_bad;
523
}
524
rde->name = oname.name;
525
rde->name_len = oname.len;
526
527
/* inode */
528
err = parse_reply_info_in(p, end, &rde->inode, features);
529
if (err < 0)
530
goto out_bad;
531
/* ceph_readdir_prepopulate() will update it */
532
rde->offset = 0;
533
i++;
534
num--;
535
}
536
537
done:
538
/* Skip over any unrecognized fields */
539
*p = end;
540
return 0;
541
542
bad:
543
err = -EIO;
544
out_bad:
545
pr_err_client(cl, "problem parsing dir contents %d\n", err);
546
return err;
547
}
548
549
/*
550
* parse fcntl F_GETLK results
551
*/
552
static int parse_reply_info_filelock(void **p, void *end,
553
struct ceph_mds_reply_info_parsed *info,
554
u64 features)
555
{
556
if (*p + sizeof(*info->filelock_reply) > end)
557
goto bad;
558
559
info->filelock_reply = *p;
560
561
/* Skip over any unrecognized fields */
562
*p = end;
563
return 0;
564
bad:
565
return -EIO;
566
}
567
568
569
#if BITS_PER_LONG == 64
570
571
#define DELEGATED_INO_AVAILABLE xa_mk_value(1)
572
573
static int ceph_parse_deleg_inos(void **p, void *end,
574
struct ceph_mds_session *s)
575
{
576
struct ceph_client *cl = s->s_mdsc->fsc->client;
577
u32 sets;
578
579
ceph_decode_32_safe(p, end, sets, bad);
580
doutc(cl, "got %u sets of delegated inodes\n", sets);
581
while (sets--) {
582
u64 start, len;
583
584
ceph_decode_64_safe(p, end, start, bad);
585
ceph_decode_64_safe(p, end, len, bad);
586
587
/* Don't accept a delegation of system inodes */
588
if (start < CEPH_INO_SYSTEM_BASE) {
589
pr_warn_ratelimited_client(cl,
590
"ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
591
start, len);
592
continue;
593
}
594
while (len--) {
595
int err = xa_insert(&s->s_delegated_inos, start++,
596
DELEGATED_INO_AVAILABLE,
597
GFP_KERNEL);
598
if (!err) {
599
doutc(cl, "added delegated inode 0x%llx\n", start - 1);
600
} else if (err == -EBUSY) {
601
pr_warn_client(cl,
602
"MDS delegated inode 0x%llx more than once.\n",
603
start - 1);
604
} else {
605
return err;
606
}
607
}
608
}
609
return 0;
610
bad:
611
return -EIO;
612
}
613
614
u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
615
{
616
unsigned long ino;
617
void *val;
618
619
xa_for_each(&s->s_delegated_inos, ino, val) {
620
val = xa_erase(&s->s_delegated_inos, ino);
621
if (val == DELEGATED_INO_AVAILABLE)
622
return ino;
623
}
624
return 0;
625
}
626
627
int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
628
{
629
return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
630
GFP_KERNEL);
631
}
632
#else /* BITS_PER_LONG == 64 */
633
/*
634
* FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
635
* ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
636
* and bottom words?
637
*/
638
static int ceph_parse_deleg_inos(void **p, void *end,
639
struct ceph_mds_session *s)
640
{
641
u32 sets;
642
643
ceph_decode_32_safe(p, end, sets, bad);
644
if (sets)
645
ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
646
return 0;
647
bad:
648
return -EIO;
649
}
650
651
u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
652
{
653
return 0;
654
}
655
656
int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
657
{
658
return 0;
659
}
660
#endif /* BITS_PER_LONG == 64 */
661
662
/*
663
* parse create results
664
*/
665
static int parse_reply_info_create(void **p, void *end,
666
struct ceph_mds_reply_info_parsed *info,
667
u64 features, struct ceph_mds_session *s)
668
{
669
int ret;
670
671
if (features == (u64)-1 ||
672
(features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
673
if (*p == end) {
674
/* Malformed reply? */
675
info->has_create_ino = false;
676
} else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
677
info->has_create_ino = true;
678
/* struct_v, struct_compat, and len */
679
ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
680
ceph_decode_64_safe(p, end, info->ino, bad);
681
ret = ceph_parse_deleg_inos(p, end, s);
682
if (ret)
683
return ret;
684
} else {
685
/* legacy */
686
ceph_decode_64_safe(p, end, info->ino, bad);
687
info->has_create_ino = true;
688
}
689
} else {
690
if (*p != end)
691
goto bad;
692
}
693
694
/* Skip over any unrecognized fields */
695
*p = end;
696
return 0;
697
bad:
698
return -EIO;
699
}
700
701
static int parse_reply_info_getvxattr(void **p, void *end,
702
struct ceph_mds_reply_info_parsed *info,
703
u64 features)
704
{
705
u32 value_len;
706
707
ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */
708
ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */
709
ceph_decode_skip_32(p, end, bad); /* skip payload length */
710
711
ceph_decode_32_safe(p, end, value_len, bad);
712
713
if (value_len == end - *p) {
714
info->xattr_info.xattr_value = *p;
715
info->xattr_info.xattr_value_len = value_len;
716
*p = end;
717
return value_len;
718
}
719
bad:
720
return -EIO;
721
}
722
723
/*
724
* parse extra results
725
*/
726
static int parse_reply_info_extra(void **p, void *end,
727
struct ceph_mds_request *req,
728
u64 features, struct ceph_mds_session *s)
729
{
730
struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
731
u32 op = le32_to_cpu(info->head->op);
732
733
if (op == CEPH_MDS_OP_GETFILELOCK)
734
return parse_reply_info_filelock(p, end, info, features);
735
else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
736
return parse_reply_info_readdir(p, end, req, features);
737
else if (op == CEPH_MDS_OP_CREATE)
738
return parse_reply_info_create(p, end, info, features, s);
739
else if (op == CEPH_MDS_OP_GETVXATTR)
740
return parse_reply_info_getvxattr(p, end, info, features);
741
else
742
return -EIO;
743
}
744
745
/*
746
* parse entire mds reply
747
*/
748
static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
749
struct ceph_mds_request *req, u64 features)
750
{
751
struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
752
struct ceph_client *cl = s->s_mdsc->fsc->client;
753
void *p, *end;
754
u32 len;
755
int err;
756
757
info->head = msg->front.iov_base;
758
p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
759
end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
760
761
/* trace */
762
ceph_decode_32_safe(&p, end, len, bad);
763
if (len > 0) {
764
ceph_decode_need(&p, end, len, bad);
765
err = parse_reply_info_trace(&p, p+len, info, features);
766
if (err < 0)
767
goto out_bad;
768
}
769
770
/* extra */
771
ceph_decode_32_safe(&p, end, len, bad);
772
if (len > 0) {
773
ceph_decode_need(&p, end, len, bad);
774
err = parse_reply_info_extra(&p, p+len, req, features, s);
775
if (err < 0)
776
goto out_bad;
777
}
778
779
/* snap blob */
780
ceph_decode_32_safe(&p, end, len, bad);
781
info->snapblob_len = len;
782
info->snapblob = p;
783
p += len;
784
785
if (p != end)
786
goto bad;
787
return 0;
788
789
bad:
790
err = -EIO;
791
out_bad:
792
pr_err_client(cl, "mds parse_reply err %d\n", err);
793
ceph_msg_dump(msg);
794
return err;
795
}
796
797
static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
798
{
799
int i;
800
801
kfree(info->diri.fscrypt_auth);
802
kfree(info->diri.fscrypt_file);
803
kfree(info->targeti.fscrypt_auth);
804
kfree(info->targeti.fscrypt_file);
805
if (!info->dir_entries)
806
return;
807
808
for (i = 0; i < info->dir_nr; i++) {
809
struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
810
811
kfree(rde->inode.fscrypt_auth);
812
kfree(rde->inode.fscrypt_file);
813
}
814
free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
815
}
816
817
/*
818
* In async unlink case the kclient won't wait for the first reply
819
* from MDS and just drop all the links and unhash the dentry and then
820
* succeeds immediately.
821
*
822
* For any new create/link/rename,etc requests followed by using the
823
* same file names we must wait for the first reply of the inflight
824
* unlink request, or the MDS possibly will fail these following
825
* requests with -EEXIST if the inflight async unlink request was
826
* delayed for some reasons.
827
*
828
* And the worst case is that for the none async openc request it will
829
* successfully open the file if the CDentry hasn't been unlinked yet,
830
* but later the previous delayed async unlink request will remove the
831
* CDentry. That means the just created file is possibly deleted later
832
* by accident.
833
*
834
* We need to wait for the inflight async unlink requests to finish
835
* when creating new files/directories by using the same file names.
836
*/
837
int ceph_wait_on_conflict_unlink(struct dentry *dentry)
838
{
839
struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb);
840
struct ceph_client *cl = fsc->client;
841
struct dentry *pdentry = dentry->d_parent;
842
struct dentry *udentry, *found = NULL;
843
struct ceph_dentry_info *di;
844
struct qstr dname;
845
u32 hash = dentry->d_name.hash;
846
int err;
847
848
dname.name = dentry->d_name.name;
849
dname.len = dentry->d_name.len;
850
851
rcu_read_lock();
852
hash_for_each_possible_rcu(fsc->async_unlink_conflict, di,
853
hnode, hash) {
854
udentry = di->dentry;
855
856
spin_lock(&udentry->d_lock);
857
if (udentry->d_name.hash != hash)
858
goto next;
859
if (unlikely(udentry->d_parent != pdentry))
860
goto next;
861
if (!hash_hashed(&di->hnode))
862
goto next;
863
864
if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
865
pr_warn_client(cl, "dentry %p:%pd async unlink bit is not set\n",
866
dentry, dentry);
867
868
if (!d_same_name(udentry, pdentry, &dname))
869
goto next;
870
871
found = dget_dlock(udentry);
872
spin_unlock(&udentry->d_lock);
873
break;
874
next:
875
spin_unlock(&udentry->d_lock);
876
}
877
rcu_read_unlock();
878
879
if (likely(!found))
880
return 0;
881
882
doutc(cl, "dentry %p:%pd conflict with old %p:%pd\n", dentry, dentry,
883
found, found);
884
885
err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT,
886
TASK_KILLABLE);
887
dput(found);
888
return err;
889
}
890
891
892
/*
893
* sessions
894
*/
895
const char *ceph_session_state_name(int s)
896
{
897
switch (s) {
898
case CEPH_MDS_SESSION_NEW: return "new";
899
case CEPH_MDS_SESSION_OPENING: return "opening";
900
case CEPH_MDS_SESSION_OPEN: return "open";
901
case CEPH_MDS_SESSION_HUNG: return "hung";
902
case CEPH_MDS_SESSION_CLOSING: return "closing";
903
case CEPH_MDS_SESSION_CLOSED: return "closed";
904
case CEPH_MDS_SESSION_RESTARTING: return "restarting";
905
case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
906
case CEPH_MDS_SESSION_REJECTED: return "rejected";
907
default: return "???";
908
}
909
}
910
911
struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
912
{
913
if (refcount_inc_not_zero(&s->s_ref))
914
return s;
915
return NULL;
916
}
917
918
void ceph_put_mds_session(struct ceph_mds_session *s)
919
{
920
if (IS_ERR_OR_NULL(s))
921
return;
922
923
if (refcount_dec_and_test(&s->s_ref)) {
924
if (s->s_auth.authorizer)
925
ceph_auth_destroy_authorizer(s->s_auth.authorizer);
926
WARN_ON(mutex_is_locked(&s->s_mutex));
927
xa_destroy(&s->s_delegated_inos);
928
kfree(s);
929
}
930
}
931
932
/*
933
* called under mdsc->mutex
934
*/
935
struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
936
int mds)
937
{
938
if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
939
return NULL;
940
return ceph_get_mds_session(mdsc->sessions[mds]);
941
}
942
943
static bool __have_session(struct ceph_mds_client *mdsc, int mds)
944
{
945
if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
946
return false;
947
else
948
return true;
949
}
950
951
static int __verify_registered_session(struct ceph_mds_client *mdsc,
952
struct ceph_mds_session *s)
953
{
954
if (s->s_mds >= mdsc->max_sessions ||
955
mdsc->sessions[s->s_mds] != s)
956
return -ENOENT;
957
return 0;
958
}
959
960
/*
961
* create+register a new session for given mds.
962
* called under mdsc->mutex.
963
*/
964
static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
965
int mds)
966
{
967
struct ceph_client *cl = mdsc->fsc->client;
968
struct ceph_mds_session *s;
969
970
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
971
return ERR_PTR(-EIO);
972
973
if (mds >= mdsc->mdsmap->possible_max_rank)
974
return ERR_PTR(-EINVAL);
975
976
s = kzalloc(sizeof(*s), GFP_NOFS);
977
if (!s)
978
return ERR_PTR(-ENOMEM);
979
980
if (mds >= mdsc->max_sessions) {
981
int newmax = 1 << get_count_order(mds + 1);
982
struct ceph_mds_session **sa;
983
size_t ptr_size = sizeof(struct ceph_mds_session *);
984
985
doutc(cl, "realloc to %d\n", newmax);
986
sa = kcalloc(newmax, ptr_size, GFP_NOFS);
987
if (!sa)
988
goto fail_realloc;
989
if (mdsc->sessions) {
990
memcpy(sa, mdsc->sessions,
991
mdsc->max_sessions * ptr_size);
992
kfree(mdsc->sessions);
993
}
994
mdsc->sessions = sa;
995
mdsc->max_sessions = newmax;
996
}
997
998
doutc(cl, "mds%d\n", mds);
999
s->s_mdsc = mdsc;
1000
s->s_mds = mds;
1001
s->s_state = CEPH_MDS_SESSION_NEW;
1002
mutex_init(&s->s_mutex);
1003
1004
ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
1005
1006
atomic_set(&s->s_cap_gen, 1);
1007
s->s_cap_ttl = jiffies - 1;
1008
1009
spin_lock_init(&s->s_cap_lock);
1010
INIT_LIST_HEAD(&s->s_caps);
1011
refcount_set(&s->s_ref, 1);
1012
INIT_LIST_HEAD(&s->s_waiting);
1013
INIT_LIST_HEAD(&s->s_unsafe);
1014
xa_init(&s->s_delegated_inos);
1015
INIT_LIST_HEAD(&s->s_cap_releases);
1016
INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
1017
1018
INIT_LIST_HEAD(&s->s_cap_dirty);
1019
INIT_LIST_HEAD(&s->s_cap_flushing);
1020
1021
mdsc->sessions[mds] = s;
1022
atomic_inc(&mdsc->num_sessions);
1023
refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */
1024
1025
ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
1026
ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
1027
1028
return s;
1029
1030
fail_realloc:
1031
kfree(s);
1032
return ERR_PTR(-ENOMEM);
1033
}
1034
1035
/*
1036
* called under mdsc->mutex
1037
*/
1038
static void __unregister_session(struct ceph_mds_client *mdsc,
1039
struct ceph_mds_session *s)
1040
{
1041
doutc(mdsc->fsc->client, "mds%d %p\n", s->s_mds, s);
1042
BUG_ON(mdsc->sessions[s->s_mds] != s);
1043
mdsc->sessions[s->s_mds] = NULL;
1044
ceph_con_close(&s->s_con);
1045
ceph_put_mds_session(s);
1046
atomic_dec(&mdsc->num_sessions);
1047
}
1048
1049
/*
1050
* drop session refs in request.
1051
*
1052
* should be last request ref, or hold mdsc->mutex
1053
*/
1054
static void put_request_session(struct ceph_mds_request *req)
1055
{
1056
if (req->r_session) {
1057
ceph_put_mds_session(req->r_session);
1058
req->r_session = NULL;
1059
}
1060
}
1061
1062
void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
1063
void (*cb)(struct ceph_mds_session *),
1064
bool check_state)
1065
{
1066
int mds;
1067
1068
mutex_lock(&mdsc->mutex);
1069
for (mds = 0; mds < mdsc->max_sessions; ++mds) {
1070
struct ceph_mds_session *s;
1071
1072
s = __ceph_lookup_mds_session(mdsc, mds);
1073
if (!s)
1074
continue;
1075
1076
if (check_state && !check_session_state(s)) {
1077
ceph_put_mds_session(s);
1078
continue;
1079
}
1080
1081
mutex_unlock(&mdsc->mutex);
1082
cb(s);
1083
ceph_put_mds_session(s);
1084
mutex_lock(&mdsc->mutex);
1085
}
1086
mutex_unlock(&mdsc->mutex);
1087
}
1088
1089
void ceph_mdsc_release_request(struct kref *kref)
1090
{
1091
struct ceph_mds_request *req = container_of(kref,
1092
struct ceph_mds_request,
1093
r_kref);
1094
ceph_mdsc_release_dir_caps_async(req);
1095
destroy_reply_info(&req->r_reply_info);
1096
if (req->r_request)
1097
ceph_msg_put(req->r_request);
1098
if (req->r_reply)
1099
ceph_msg_put(req->r_reply);
1100
if (req->r_inode) {
1101
ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1102
iput(req->r_inode);
1103
}
1104
if (req->r_parent) {
1105
ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
1106
iput(req->r_parent);
1107
}
1108
iput(req->r_target_inode);
1109
iput(req->r_new_inode);
1110
if (req->r_dentry)
1111
dput(req->r_dentry);
1112
if (req->r_old_dentry)
1113
dput(req->r_old_dentry);
1114
if (req->r_old_dentry_dir) {
1115
/*
1116
* track (and drop pins for) r_old_dentry_dir
1117
* separately, since r_old_dentry's d_parent may have
1118
* changed between the dir mutex being dropped and
1119
* this request being freed.
1120
*/
1121
ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
1122
CEPH_CAP_PIN);
1123
iput(req->r_old_dentry_dir);
1124
}
1125
kfree(req->r_path1);
1126
kfree(req->r_path2);
1127
put_cred(req->r_cred);
1128
if (req->r_mnt_idmap)
1129
mnt_idmap_put(req->r_mnt_idmap);
1130
if (req->r_pagelist)
1131
ceph_pagelist_release(req->r_pagelist);
1132
kfree(req->r_fscrypt_auth);
1133
kfree(req->r_altname);
1134
put_request_session(req);
1135
ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
1136
WARN_ON_ONCE(!list_empty(&req->r_wait));
1137
kmem_cache_free(ceph_mds_request_cachep, req);
1138
}
1139
1140
DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
1141
1142
/*
1143
* lookup session, bump ref if found.
1144
*
1145
* called under mdsc->mutex.
1146
*/
1147
static struct ceph_mds_request *
1148
lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
1149
{
1150
struct ceph_mds_request *req;
1151
1152
req = lookup_request(&mdsc->request_tree, tid);
1153
if (req)
1154
ceph_mdsc_get_request(req);
1155
1156
return req;
1157
}
1158
1159
/*
1160
* Register an in-flight request, and assign a tid. Link to directory
1161
* are modifying (if any).
1162
*
1163
* Called under mdsc->mutex.
1164
*/
1165
static void __register_request(struct ceph_mds_client *mdsc,
1166
struct ceph_mds_request *req,
1167
struct inode *dir)
1168
{
1169
struct ceph_client *cl = mdsc->fsc->client;
1170
int ret = 0;
1171
1172
req->r_tid = ++mdsc->last_tid;
1173
if (req->r_num_caps) {
1174
ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
1175
req->r_num_caps);
1176
if (ret < 0) {
1177
pr_err_client(cl, "%p failed to reserve caps: %d\n",
1178
req, ret);
1179
/* set req->r_err to fail early from __do_request */
1180
req->r_err = ret;
1181
return;
1182
}
1183
}
1184
doutc(cl, "%p tid %lld\n", req, req->r_tid);
1185
ceph_mdsc_get_request(req);
1186
insert_request(&mdsc->request_tree, req);
1187
1188
req->r_cred = get_current_cred();
1189
if (!req->r_mnt_idmap)
1190
req->r_mnt_idmap = &nop_mnt_idmap;
1191
1192
if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
1193
mdsc->oldest_tid = req->r_tid;
1194
1195
if (dir) {
1196
struct ceph_inode_info *ci = ceph_inode(dir);
1197
1198
ihold(dir);
1199
req->r_unsafe_dir = dir;
1200
spin_lock(&ci->i_unsafe_lock);
1201
list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
1202
spin_unlock(&ci->i_unsafe_lock);
1203
}
1204
}
1205
1206
static void __unregister_request(struct ceph_mds_client *mdsc,
1207
struct ceph_mds_request *req)
1208
{
1209
doutc(mdsc->fsc->client, "%p tid %lld\n", req, req->r_tid);
1210
1211
/* Never leave an unregistered request on an unsafe list! */
1212
list_del_init(&req->r_unsafe_item);
1213
1214
if (req->r_tid == mdsc->oldest_tid) {
1215
struct rb_node *p = rb_next(&req->r_node);
1216
mdsc->oldest_tid = 0;
1217
while (p) {
1218
struct ceph_mds_request *next_req =
1219
rb_entry(p, struct ceph_mds_request, r_node);
1220
if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
1221
mdsc->oldest_tid = next_req->r_tid;
1222
break;
1223
}
1224
p = rb_next(p);
1225
}
1226
}
1227
1228
erase_request(&mdsc->request_tree, req);
1229
1230
if (req->r_unsafe_dir) {
1231
struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
1232
spin_lock(&ci->i_unsafe_lock);
1233
list_del_init(&req->r_unsafe_dir_item);
1234
spin_unlock(&ci->i_unsafe_lock);
1235
}
1236
if (req->r_target_inode &&
1237
test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
1238
struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
1239
spin_lock(&ci->i_unsafe_lock);
1240
list_del_init(&req->r_unsafe_target_item);
1241
spin_unlock(&ci->i_unsafe_lock);
1242
}
1243
1244
if (req->r_unsafe_dir) {
1245
iput(req->r_unsafe_dir);
1246
req->r_unsafe_dir = NULL;
1247
}
1248
1249
complete_all(&req->r_safe_completion);
1250
1251
ceph_mdsc_put_request(req);
1252
}
1253
1254
/*
1255
* Walk back up the dentry tree until we hit a dentry representing a
1256
* non-snapshot inode. We do this using the rcu_read_lock (which must be held
1257
* when calling this) to ensure that the objects won't disappear while we're
1258
* working with them. Once we hit a candidate dentry, we attempt to take a
1259
* reference to it, and return that as the result.
1260
*/
1261
static struct inode *get_nonsnap_parent(struct dentry *dentry)
1262
{
1263
struct inode *inode = NULL;
1264
1265
while (dentry && !IS_ROOT(dentry)) {
1266
inode = d_inode_rcu(dentry);
1267
if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
1268
break;
1269
dentry = dentry->d_parent;
1270
}
1271
if (inode)
1272
inode = igrab(inode);
1273
return inode;
1274
}
1275
1276
/*
1277
* Choose mds to send request to next. If there is a hint set in the
1278
* request (e.g., due to a prior forward hint from the mds), use that.
1279
* Otherwise, consult frag tree and/or caps to identify the
1280
* appropriate mds. If all else fails, choose randomly.
1281
*
1282
* Called under mdsc->mutex.
1283
*/
1284
static int __choose_mds(struct ceph_mds_client *mdsc,
1285
struct ceph_mds_request *req,
1286
bool *random)
1287
{
1288
struct inode *inode;
1289
struct ceph_inode_info *ci;
1290
struct ceph_cap *cap;
1291
int mode = req->r_direct_mode;
1292
int mds = -1;
1293
u32 hash = req->r_direct_hash;
1294
bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1295
struct ceph_client *cl = mdsc->fsc->client;
1296
1297
if (random)
1298
*random = false;
1299
1300
/*
1301
* is there a specific mds we should try? ignore hint if we have
1302
* no session and the mds is not up (active or recovering).
1303
*/
1304
if (req->r_resend_mds >= 0 &&
1305
(__have_session(mdsc, req->r_resend_mds) ||
1306
ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1307
doutc(cl, "using resend_mds mds%d\n", req->r_resend_mds);
1308
return req->r_resend_mds;
1309
}
1310
1311
if (mode == USE_RANDOM_MDS)
1312
goto random;
1313
1314
inode = NULL;
1315
if (req->r_inode) {
1316
if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1317
inode = req->r_inode;
1318
ihold(inode);
1319
} else {
1320
/* req->r_dentry is non-null for LSSNAP request */
1321
rcu_read_lock();
1322
inode = get_nonsnap_parent(req->r_dentry);
1323
rcu_read_unlock();
1324
doutc(cl, "using snapdir's parent %p %llx.%llx\n",
1325
inode, ceph_vinop(inode));
1326
}
1327
} else if (req->r_dentry) {
1328
/* ignore race with rename; old or new d_parent is okay */
1329
struct dentry *parent;
1330
struct inode *dir;
1331
1332
rcu_read_lock();
1333
parent = READ_ONCE(req->r_dentry->d_parent);
1334
dir = req->r_parent ? : d_inode_rcu(parent);
1335
1336
if (!dir || dir->i_sb != mdsc->fsc->sb) {
1337
/* not this fs or parent went negative */
1338
inode = d_inode(req->r_dentry);
1339
if (inode)
1340
ihold(inode);
1341
} else if (ceph_snap(dir) != CEPH_NOSNAP) {
1342
/* direct snapped/virtual snapdir requests
1343
* based on parent dir inode */
1344
inode = get_nonsnap_parent(parent);
1345
doutc(cl, "using nonsnap parent %p %llx.%llx\n",
1346
inode, ceph_vinop(inode));
1347
} else {
1348
/* dentry target */
1349
inode = d_inode(req->r_dentry);
1350
if (!inode || mode == USE_AUTH_MDS) {
1351
/* dir + name */
1352
inode = igrab(dir);
1353
hash = ceph_dentry_hash(dir, req->r_dentry);
1354
is_hash = true;
1355
} else {
1356
ihold(inode);
1357
}
1358
}
1359
rcu_read_unlock();
1360
}
1361
1362
if (!inode)
1363
goto random;
1364
1365
doutc(cl, "%p %llx.%llx is_hash=%d (0x%x) mode %d\n", inode,
1366
ceph_vinop(inode), (int)is_hash, hash, mode);
1367
ci = ceph_inode(inode);
1368
1369
if (is_hash && S_ISDIR(inode->i_mode)) {
1370
struct ceph_inode_frag frag;
1371
int found;
1372
1373
ceph_choose_frag(ci, hash, &frag, &found);
1374
if (found) {
1375
if (mode == USE_ANY_MDS && frag.ndist > 0) {
1376
u8 r;
1377
1378
/* choose a random replica */
1379
get_random_bytes(&r, 1);
1380
r %= frag.ndist;
1381
mds = frag.dist[r];
1382
doutc(cl, "%p %llx.%llx frag %u mds%d (%d/%d)\n",
1383
inode, ceph_vinop(inode), frag.frag,
1384
mds, (int)r, frag.ndist);
1385
if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1386
CEPH_MDS_STATE_ACTIVE &&
1387
!ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1388
goto out;
1389
}
1390
1391
/* since this file/dir wasn't known to be
1392
* replicated, then we want to look for the
1393
* authoritative mds. */
1394
if (frag.mds >= 0) {
1395
/* choose auth mds */
1396
mds = frag.mds;
1397
doutc(cl, "%p %llx.%llx frag %u mds%d (auth)\n",
1398
inode, ceph_vinop(inode), frag.frag, mds);
1399
if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1400
CEPH_MDS_STATE_ACTIVE) {
1401
if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1402
mds))
1403
goto out;
1404
}
1405
}
1406
mode = USE_AUTH_MDS;
1407
}
1408
}
1409
1410
spin_lock(&ci->i_ceph_lock);
1411
cap = NULL;
1412
if (mode == USE_AUTH_MDS)
1413
cap = ci->i_auth_cap;
1414
if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1415
cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1416
if (!cap) {
1417
spin_unlock(&ci->i_ceph_lock);
1418
iput(inode);
1419
goto random;
1420
}
1421
mds = cap->session->s_mds;
1422
doutc(cl, "%p %llx.%llx mds%d (%scap %p)\n", inode,
1423
ceph_vinop(inode), mds,
1424
cap == ci->i_auth_cap ? "auth " : "", cap);
1425
spin_unlock(&ci->i_ceph_lock);
1426
out:
1427
iput(inode);
1428
return mds;
1429
1430
random:
1431
if (random)
1432
*random = true;
1433
1434
mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1435
doutc(cl, "chose random mds%d\n", mds);
1436
return mds;
1437
}
1438
1439
1440
/*
1441
* session messages
1442
*/
1443
struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
1444
{
1445
struct ceph_msg *msg;
1446
struct ceph_mds_session_head *h;
1447
1448
msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1449
false);
1450
if (!msg) {
1451
pr_err("ENOMEM creating session %s msg\n",
1452
ceph_session_op_name(op));
1453
return NULL;
1454
}
1455
h = msg->front.iov_base;
1456
h->op = cpu_to_le32(op);
1457
h->seq = cpu_to_le64(seq);
1458
1459
return msg;
1460
}
1461
1462
static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1463
#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1464
static int encode_supported_features(void **p, void *end)
1465
{
1466
static const size_t count = ARRAY_SIZE(feature_bits);
1467
1468
if (count > 0) {
1469
size_t i;
1470
size_t size = FEATURE_BYTES(count);
1471
unsigned long bit;
1472
1473
if (WARN_ON_ONCE(*p + 4 + size > end))
1474
return -ERANGE;
1475
1476
ceph_encode_32(p, size);
1477
memset(*p, 0, size);
1478
for (i = 0; i < count; i++) {
1479
bit = feature_bits[i];
1480
((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
1481
}
1482
*p += size;
1483
} else {
1484
if (WARN_ON_ONCE(*p + 4 > end))
1485
return -ERANGE;
1486
1487
ceph_encode_32(p, 0);
1488
}
1489
1490
return 0;
1491
}
1492
1493
static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1494
#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
1495
static int encode_metric_spec(void **p, void *end)
1496
{
1497
static const size_t count = ARRAY_SIZE(metric_bits);
1498
1499
/* header */
1500
if (WARN_ON_ONCE(*p + 2 > end))
1501
return -ERANGE;
1502
1503
ceph_encode_8(p, 1); /* version */
1504
ceph_encode_8(p, 1); /* compat */
1505
1506
if (count > 0) {
1507
size_t i;
1508
size_t size = METRIC_BYTES(count);
1509
1510
if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1511
return -ERANGE;
1512
1513
/* metric spec info length */
1514
ceph_encode_32(p, 4 + size);
1515
1516
/* metric spec */
1517
ceph_encode_32(p, size);
1518
memset(*p, 0, size);
1519
for (i = 0; i < count; i++)
1520
((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1521
*p += size;
1522
} else {
1523
if (WARN_ON_ONCE(*p + 4 + 4 > end))
1524
return -ERANGE;
1525
1526
/* metric spec info length */
1527
ceph_encode_32(p, 4);
1528
/* metric spec */
1529
ceph_encode_32(p, 0);
1530
}
1531
1532
return 0;
1533
}
1534
1535
/*
1536
* session message, specialization for CEPH_SESSION_REQUEST_OPEN
1537
* to include additional client metadata fields.
1538
*/
1539
static struct ceph_msg *
1540
create_session_full_msg(struct ceph_mds_client *mdsc, int op, u64 seq)
1541
{
1542
struct ceph_msg *msg;
1543
struct ceph_mds_session_head *h;
1544
int i;
1545
int extra_bytes = 0;
1546
int metadata_key_count = 0;
1547
struct ceph_options *opt = mdsc->fsc->client->options;
1548
struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1549
struct ceph_client *cl = mdsc->fsc->client;
1550
size_t size, count;
1551
void *p, *end;
1552
int ret;
1553
1554
const char* metadata[][2] = {
1555
{"hostname", mdsc->nodename},
1556
{"kernel_version", init_utsname()->release},
1557
{"entity_id", opt->name ? : ""},
1558
{"root", fsopt->server_path ? : "/"},
1559
{NULL, NULL}
1560
};
1561
1562
/* Calculate serialized length of metadata */
1563
extra_bytes = 4; /* map length */
1564
for (i = 0; metadata[i][0]; ++i) {
1565
extra_bytes += 8 + strlen(metadata[i][0]) +
1566
strlen(metadata[i][1]);
1567
metadata_key_count++;
1568
}
1569
1570
/* supported feature */
1571
size = 0;
1572
count = ARRAY_SIZE(feature_bits);
1573
if (count > 0)
1574
size = FEATURE_BYTES(count);
1575
extra_bytes += 4 + size;
1576
1577
/* metric spec */
1578
size = 0;
1579
count = ARRAY_SIZE(metric_bits);
1580
if (count > 0)
1581
size = METRIC_BYTES(count);
1582
extra_bytes += 2 + 4 + 4 + size;
1583
1584
/* flags, mds auth caps and oldest_client_tid */
1585
extra_bytes += 4 + 4 + 8;
1586
1587
/* Allocate the message */
1588
msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1589
GFP_NOFS, false);
1590
if (!msg) {
1591
pr_err_client(cl, "ENOMEM creating session open msg\n");
1592
return ERR_PTR(-ENOMEM);
1593
}
1594
p = msg->front.iov_base;
1595
end = p + msg->front.iov_len;
1596
1597
h = p;
1598
h->op = cpu_to_le32(op);
1599
h->seq = cpu_to_le64(seq);
1600
1601
/*
1602
* Serialize client metadata into waiting buffer space, using
1603
* the format that userspace expects for map<string, string>
1604
*
1605
* ClientSession messages with metadata are v7
1606
*/
1607
msg->hdr.version = cpu_to_le16(7);
1608
msg->hdr.compat_version = cpu_to_le16(1);
1609
1610
/* The write pointer, following the session_head structure */
1611
p += sizeof(*h);
1612
1613
/* Number of entries in the map */
1614
ceph_encode_32(&p, metadata_key_count);
1615
1616
/* Two length-prefixed strings for each entry in the map */
1617
for (i = 0; metadata[i][0]; ++i) {
1618
size_t const key_len = strlen(metadata[i][0]);
1619
size_t const val_len = strlen(metadata[i][1]);
1620
1621
ceph_encode_32(&p, key_len);
1622
memcpy(p, metadata[i][0], key_len);
1623
p += key_len;
1624
ceph_encode_32(&p, val_len);
1625
memcpy(p, metadata[i][1], val_len);
1626
p += val_len;
1627
}
1628
1629
ret = encode_supported_features(&p, end);
1630
if (ret) {
1631
pr_err_client(cl, "encode_supported_features failed!\n");
1632
ceph_msg_put(msg);
1633
return ERR_PTR(ret);
1634
}
1635
1636
ret = encode_metric_spec(&p, end);
1637
if (ret) {
1638
pr_err_client(cl, "encode_metric_spec failed!\n");
1639
ceph_msg_put(msg);
1640
return ERR_PTR(ret);
1641
}
1642
1643
/* version == 5, flags */
1644
ceph_encode_32(&p, 0);
1645
1646
/* version == 6, mds auth caps */
1647
ceph_encode_32(&p, 0);
1648
1649
/* version == 7, oldest_client_tid */
1650
ceph_encode_64(&p, mdsc->oldest_tid);
1651
1652
msg->front.iov_len = p - msg->front.iov_base;
1653
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1654
1655
return msg;
1656
}
1657
1658
/*
1659
* send session open request.
1660
*
1661
* called under mdsc->mutex
1662
*/
1663
static int __open_session(struct ceph_mds_client *mdsc,
1664
struct ceph_mds_session *session)
1665
{
1666
struct ceph_msg *msg;
1667
int mstate;
1668
int mds = session->s_mds;
1669
1670
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
1671
return -EIO;
1672
1673
/* wait for mds to go active? */
1674
mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1675
doutc(mdsc->fsc->client, "open_session to mds%d (%s)\n", mds,
1676
ceph_mds_state_name(mstate));
1677
session->s_state = CEPH_MDS_SESSION_OPENING;
1678
session->s_renew_requested = jiffies;
1679
1680
/* send connect message */
1681
msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_OPEN,
1682
session->s_seq);
1683
if (IS_ERR(msg))
1684
return PTR_ERR(msg);
1685
ceph_con_send(&session->s_con, msg);
1686
return 0;
1687
}
1688
1689
/*
1690
* open sessions for any export targets for the given mds
1691
*
1692
* called under mdsc->mutex
1693
*/
1694
static struct ceph_mds_session *
1695
__open_export_target_session(struct ceph_mds_client *mdsc, int target)
1696
{
1697
struct ceph_mds_session *session;
1698
int ret;
1699
1700
session = __ceph_lookup_mds_session(mdsc, target);
1701
if (!session) {
1702
session = register_session(mdsc, target);
1703
if (IS_ERR(session))
1704
return session;
1705
}
1706
if (session->s_state == CEPH_MDS_SESSION_NEW ||
1707
session->s_state == CEPH_MDS_SESSION_CLOSING) {
1708
ret = __open_session(mdsc, session);
1709
if (ret)
1710
return ERR_PTR(ret);
1711
}
1712
1713
return session;
1714
}
1715
1716
struct ceph_mds_session *
1717
ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1718
{
1719
struct ceph_mds_session *session;
1720
struct ceph_client *cl = mdsc->fsc->client;
1721
1722
doutc(cl, "to mds%d\n", target);
1723
1724
mutex_lock(&mdsc->mutex);
1725
session = __open_export_target_session(mdsc, target);
1726
mutex_unlock(&mdsc->mutex);
1727
1728
return session;
1729
}
1730
1731
static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1732
struct ceph_mds_session *session)
1733
{
1734
struct ceph_mds_info *mi;
1735
struct ceph_mds_session *ts;
1736
int i, mds = session->s_mds;
1737
struct ceph_client *cl = mdsc->fsc->client;
1738
1739
if (mds >= mdsc->mdsmap->possible_max_rank)
1740
return;
1741
1742
mi = &mdsc->mdsmap->m_info[mds];
1743
doutc(cl, "for mds%d (%d targets)\n", session->s_mds,
1744
mi->num_export_targets);
1745
1746
for (i = 0; i < mi->num_export_targets; i++) {
1747
ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1748
ceph_put_mds_session(ts);
1749
}
1750
}
1751
1752
/*
1753
* session caps
1754
*/
1755
1756
static void detach_cap_releases(struct ceph_mds_session *session,
1757
struct list_head *target)
1758
{
1759
struct ceph_client *cl = session->s_mdsc->fsc->client;
1760
1761
lockdep_assert_held(&session->s_cap_lock);
1762
1763
list_splice_init(&session->s_cap_releases, target);
1764
session->s_num_cap_releases = 0;
1765
doutc(cl, "mds%d\n", session->s_mds);
1766
}
1767
1768
static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1769
struct list_head *dispose)
1770
{
1771
while (!list_empty(dispose)) {
1772
struct ceph_cap *cap;
1773
/* zero out the in-progress message */
1774
cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1775
list_del(&cap->session_caps);
1776
ceph_put_cap(mdsc, cap);
1777
}
1778
}
1779
1780
static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1781
struct ceph_mds_session *session)
1782
{
1783
struct ceph_client *cl = mdsc->fsc->client;
1784
struct ceph_mds_request *req;
1785
struct rb_node *p;
1786
1787
doutc(cl, "mds%d\n", session->s_mds);
1788
mutex_lock(&mdsc->mutex);
1789
while (!list_empty(&session->s_unsafe)) {
1790
req = list_first_entry(&session->s_unsafe,
1791
struct ceph_mds_request, r_unsafe_item);
1792
pr_warn_ratelimited_client(cl, " dropping unsafe request %llu\n",
1793
req->r_tid);
1794
if (req->r_target_inode)
1795
mapping_set_error(req->r_target_inode->i_mapping, -EIO);
1796
if (req->r_unsafe_dir)
1797
mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
1798
__unregister_request(mdsc, req);
1799
}
1800
/* zero r_attempts, so kick_requests() will re-send requests */
1801
p = rb_first(&mdsc->request_tree);
1802
while (p) {
1803
req = rb_entry(p, struct ceph_mds_request, r_node);
1804
p = rb_next(p);
1805
if (req->r_session &&
1806
req->r_session->s_mds == session->s_mds)
1807
req->r_attempts = 0;
1808
}
1809
mutex_unlock(&mdsc->mutex);
1810
}
1811
1812
/*
1813
* Helper to safely iterate over all caps associated with a session, with
1814
* special care taken to handle a racing __ceph_remove_cap().
1815
*
1816
* Caller must hold session s_mutex.
1817
*/
1818
int ceph_iterate_session_caps(struct ceph_mds_session *session,
1819
int (*cb)(struct inode *, int mds, void *),
1820
void *arg)
1821
{
1822
struct ceph_client *cl = session->s_mdsc->fsc->client;
1823
struct list_head *p;
1824
struct ceph_cap *cap;
1825
struct inode *inode, *last_inode = NULL;
1826
struct ceph_cap *old_cap = NULL;
1827
int ret;
1828
1829
doutc(cl, "%p mds%d\n", session, session->s_mds);
1830
spin_lock(&session->s_cap_lock);
1831
p = session->s_caps.next;
1832
while (p != &session->s_caps) {
1833
int mds;
1834
1835
cap = list_entry(p, struct ceph_cap, session_caps);
1836
inode = igrab(&cap->ci->netfs.inode);
1837
if (!inode) {
1838
p = p->next;
1839
continue;
1840
}
1841
session->s_cap_iterator = cap;
1842
mds = cap->mds;
1843
spin_unlock(&session->s_cap_lock);
1844
1845
if (last_inode) {
1846
iput(last_inode);
1847
last_inode = NULL;
1848
}
1849
if (old_cap) {
1850
ceph_put_cap(session->s_mdsc, old_cap);
1851
old_cap = NULL;
1852
}
1853
1854
ret = cb(inode, mds, arg);
1855
last_inode = inode;
1856
1857
spin_lock(&session->s_cap_lock);
1858
p = p->next;
1859
if (!cap->ci) {
1860
doutc(cl, "finishing cap %p removal\n", cap);
1861
BUG_ON(cap->session != session);
1862
cap->session = NULL;
1863
list_del_init(&cap->session_caps);
1864
session->s_nr_caps--;
1865
atomic64_dec(&session->s_mdsc->metric.total_caps);
1866
if (cap->queue_release)
1867
__ceph_queue_cap_release(session, cap);
1868
else
1869
old_cap = cap; /* put_cap it w/o locks held */
1870
}
1871
if (ret < 0)
1872
goto out;
1873
}
1874
ret = 0;
1875
out:
1876
session->s_cap_iterator = NULL;
1877
spin_unlock(&session->s_cap_lock);
1878
1879
iput(last_inode);
1880
if (old_cap)
1881
ceph_put_cap(session->s_mdsc, old_cap);
1882
1883
return ret;
1884
}
1885
1886
static int remove_session_caps_cb(struct inode *inode, int mds, void *arg)
1887
{
1888
struct ceph_inode_info *ci = ceph_inode(inode);
1889
struct ceph_client *cl = ceph_inode_to_client(inode);
1890
bool invalidate = false;
1891
struct ceph_cap *cap;
1892
int iputs = 0;
1893
1894
spin_lock(&ci->i_ceph_lock);
1895
cap = __get_cap_for_mds(ci, mds);
1896
if (cap) {
1897
doutc(cl, " removing cap %p, ci is %p, inode is %p\n",
1898
cap, ci, &ci->netfs.inode);
1899
1900
iputs = ceph_purge_inode_cap(inode, cap, &invalidate);
1901
}
1902
spin_unlock(&ci->i_ceph_lock);
1903
1904
if (cap)
1905
wake_up_all(&ci->i_cap_wq);
1906
if (invalidate)
1907
ceph_queue_invalidate(inode);
1908
while (iputs--)
1909
iput(inode);
1910
return 0;
1911
}
1912
1913
/*
1914
* caller must hold session s_mutex
1915
*/
1916
static void remove_session_caps(struct ceph_mds_session *session)
1917
{
1918
struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1919
struct super_block *sb = fsc->sb;
1920
LIST_HEAD(dispose);
1921
1922
doutc(fsc->client, "on %p\n", session);
1923
ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1924
1925
wake_up_all(&fsc->mdsc->cap_flushing_wq);
1926
1927
spin_lock(&session->s_cap_lock);
1928
if (session->s_nr_caps > 0) {
1929
struct inode *inode;
1930
struct ceph_cap *cap, *prev = NULL;
1931
struct ceph_vino vino;
1932
/*
1933
* iterate_session_caps() skips inodes that are being
1934
* deleted, we need to wait until deletions are complete.
1935
* __wait_on_freeing_inode() is designed for the job,
1936
* but it is not exported, so use lookup inode function
1937
* to access it.
1938
*/
1939
while (!list_empty(&session->s_caps)) {
1940
cap = list_entry(session->s_caps.next,
1941
struct ceph_cap, session_caps);
1942
if (cap == prev)
1943
break;
1944
prev = cap;
1945
vino = cap->ci->i_vino;
1946
spin_unlock(&session->s_cap_lock);
1947
1948
inode = ceph_find_inode(sb, vino);
1949
iput(inode);
1950
1951
spin_lock(&session->s_cap_lock);
1952
}
1953
}
1954
1955
// drop cap expires and unlock s_cap_lock
1956
detach_cap_releases(session, &dispose);
1957
1958
BUG_ON(session->s_nr_caps > 0);
1959
BUG_ON(!list_empty(&session->s_cap_flushing));
1960
spin_unlock(&session->s_cap_lock);
1961
dispose_cap_releases(session->s_mdsc, &dispose);
1962
}
1963
1964
enum {
1965
RECONNECT,
1966
RENEWCAPS,
1967
FORCE_RO,
1968
};
1969
1970
/*
1971
* wake up any threads waiting on this session's caps. if the cap is
1972
* old (didn't get renewed on the client reconnect), remove it now.
1973
*
1974
* caller must hold s_mutex.
1975
*/
1976
static int wake_up_session_cb(struct inode *inode, int mds, void *arg)
1977
{
1978
struct ceph_inode_info *ci = ceph_inode(inode);
1979
unsigned long ev = (unsigned long)arg;
1980
1981
if (ev == RECONNECT) {
1982
spin_lock(&ci->i_ceph_lock);
1983
ci->i_wanted_max_size = 0;
1984
ci->i_requested_max_size = 0;
1985
spin_unlock(&ci->i_ceph_lock);
1986
} else if (ev == RENEWCAPS) {
1987
struct ceph_cap *cap;
1988
1989
spin_lock(&ci->i_ceph_lock);
1990
cap = __get_cap_for_mds(ci, mds);
1991
/* mds did not re-issue stale cap */
1992
if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen))
1993
cap->issued = cap->implemented = CEPH_CAP_PIN;
1994
spin_unlock(&ci->i_ceph_lock);
1995
} else if (ev == FORCE_RO) {
1996
}
1997
wake_up_all(&ci->i_cap_wq);
1998
return 0;
1999
}
2000
2001
static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
2002
{
2003
struct ceph_client *cl = session->s_mdsc->fsc->client;
2004
2005
doutc(cl, "session %p mds%d\n", session, session->s_mds);
2006
ceph_iterate_session_caps(session, wake_up_session_cb,
2007
(void *)(unsigned long)ev);
2008
}
2009
2010
/*
2011
* Send periodic message to MDS renewing all currently held caps. The
2012
* ack will reset the expiration for all caps from this session.
2013
*
2014
* caller holds s_mutex
2015
*/
2016
static int send_renew_caps(struct ceph_mds_client *mdsc,
2017
struct ceph_mds_session *session)
2018
{
2019
struct ceph_client *cl = mdsc->fsc->client;
2020
struct ceph_msg *msg;
2021
int state;
2022
2023
if (time_after_eq(jiffies, session->s_cap_ttl) &&
2024
time_after_eq(session->s_cap_ttl, session->s_renew_requested))
2025
pr_info_client(cl, "mds%d caps stale\n", session->s_mds);
2026
session->s_renew_requested = jiffies;
2027
2028
/* do not try to renew caps until a recovering mds has reconnected
2029
* with its clients. */
2030
state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
2031
if (state < CEPH_MDS_STATE_RECONNECT) {
2032
doutc(cl, "ignoring mds%d (%s)\n", session->s_mds,
2033
ceph_mds_state_name(state));
2034
return 0;
2035
}
2036
2037
doutc(cl, "to mds%d (%s)\n", session->s_mds,
2038
ceph_mds_state_name(state));
2039
msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_RENEWCAPS,
2040
++session->s_renew_seq);
2041
if (IS_ERR(msg))
2042
return PTR_ERR(msg);
2043
ceph_con_send(&session->s_con, msg);
2044
return 0;
2045
}
2046
2047
static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
2048
struct ceph_mds_session *session, u64 seq)
2049
{
2050
struct ceph_client *cl = mdsc->fsc->client;
2051
struct ceph_msg *msg;
2052
2053
doutc(cl, "to mds%d (%s)s seq %lld\n", session->s_mds,
2054
ceph_session_state_name(session->s_state), seq);
2055
msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
2056
if (!msg)
2057
return -ENOMEM;
2058
ceph_con_send(&session->s_con, msg);
2059
return 0;
2060
}
2061
2062
2063
/*
2064
* Note new cap ttl, and any transition from stale -> not stale (fresh?).
2065
*
2066
* Called under session->s_mutex
2067
*/
2068
static void renewed_caps(struct ceph_mds_client *mdsc,
2069
struct ceph_mds_session *session, int is_renew)
2070
{
2071
struct ceph_client *cl = mdsc->fsc->client;
2072
int was_stale;
2073
int wake = 0;
2074
2075
spin_lock(&session->s_cap_lock);
2076
was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
2077
2078
session->s_cap_ttl = session->s_renew_requested +
2079
mdsc->mdsmap->m_session_timeout*HZ;
2080
2081
if (was_stale) {
2082
if (time_before(jiffies, session->s_cap_ttl)) {
2083
pr_info_client(cl, "mds%d caps renewed\n",
2084
session->s_mds);
2085
wake = 1;
2086
} else {
2087
pr_info_client(cl, "mds%d caps still stale\n",
2088
session->s_mds);
2089
}
2090
}
2091
doutc(cl, "mds%d ttl now %lu, was %s, now %s\n", session->s_mds,
2092
session->s_cap_ttl, was_stale ? "stale" : "fresh",
2093
time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
2094
spin_unlock(&session->s_cap_lock);
2095
2096
if (wake)
2097
wake_up_session_caps(session, RENEWCAPS);
2098
}
2099
2100
/*
2101
* send a session close request
2102
*/
2103
static int request_close_session(struct ceph_mds_session *session)
2104
{
2105
struct ceph_client *cl = session->s_mdsc->fsc->client;
2106
struct ceph_msg *msg;
2107
2108
doutc(cl, "mds%d state %s seq %lld\n", session->s_mds,
2109
ceph_session_state_name(session->s_state), session->s_seq);
2110
msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
2111
session->s_seq);
2112
if (!msg)
2113
return -ENOMEM;
2114
ceph_con_send(&session->s_con, msg);
2115
return 1;
2116
}
2117
2118
/*
2119
* Called with s_mutex held.
2120
*/
2121
static int __close_session(struct ceph_mds_client *mdsc,
2122
struct ceph_mds_session *session)
2123
{
2124
if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
2125
return 0;
2126
session->s_state = CEPH_MDS_SESSION_CLOSING;
2127
return request_close_session(session);
2128
}
2129
2130
static bool drop_negative_children(struct dentry *dentry)
2131
{
2132
struct dentry *child;
2133
bool all_negative = true;
2134
2135
if (!d_is_dir(dentry))
2136
goto out;
2137
2138
spin_lock(&dentry->d_lock);
2139
hlist_for_each_entry(child, &dentry->d_children, d_sib) {
2140
if (d_really_is_positive(child)) {
2141
all_negative = false;
2142
break;
2143
}
2144
}
2145
spin_unlock(&dentry->d_lock);
2146
2147
if (all_negative)
2148
shrink_dcache_parent(dentry);
2149
out:
2150
return all_negative;
2151
}
2152
2153
/*
2154
* Trim old(er) caps.
2155
*
2156
* Because we can't cache an inode without one or more caps, we do
2157
* this indirectly: if a cap is unused, we prune its aliases, at which
2158
* point the inode will hopefully get dropped to.
2159
*
2160
* Yes, this is a bit sloppy. Our only real goal here is to respond to
2161
* memory pressure from the MDS, though, so it needn't be perfect.
2162
*/
2163
static int trim_caps_cb(struct inode *inode, int mds, void *arg)
2164
{
2165
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
2166
struct ceph_client *cl = mdsc->fsc->client;
2167
int *remaining = arg;
2168
struct ceph_inode_info *ci = ceph_inode(inode);
2169
int used, wanted, oissued, mine;
2170
struct ceph_cap *cap;
2171
2172
if (*remaining <= 0)
2173
return -1;
2174
2175
spin_lock(&ci->i_ceph_lock);
2176
cap = __get_cap_for_mds(ci, mds);
2177
if (!cap) {
2178
spin_unlock(&ci->i_ceph_lock);
2179
return 0;
2180
}
2181
mine = cap->issued | cap->implemented;
2182
used = __ceph_caps_used(ci);
2183
wanted = __ceph_caps_file_wanted(ci);
2184
oissued = __ceph_caps_issued_other(ci, cap);
2185
2186
doutc(cl, "%p %llx.%llx cap %p mine %s oissued %s used %s wanted %s\n",
2187
inode, ceph_vinop(inode), cap, ceph_cap_string(mine),
2188
ceph_cap_string(oissued), ceph_cap_string(used),
2189
ceph_cap_string(wanted));
2190
if (cap == ci->i_auth_cap) {
2191
if (ci->i_dirty_caps || ci->i_flushing_caps ||
2192
!list_empty(&ci->i_cap_snaps))
2193
goto out;
2194
if ((used | wanted) & CEPH_CAP_ANY_WR)
2195
goto out;
2196
/* Note: it's possible that i_filelock_ref becomes non-zero
2197
* after dropping auth caps. It doesn't hurt because reply
2198
* of lock mds request will re-add auth caps. */
2199
if (atomic_read(&ci->i_filelock_ref) > 0)
2200
goto out;
2201
}
2202
/* The inode has cached pages, but it's no longer used.
2203
* we can safely drop it */
2204
if (S_ISREG(inode->i_mode) &&
2205
wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
2206
!(oissued & CEPH_CAP_FILE_CACHE)) {
2207
used = 0;
2208
oissued = 0;
2209
}
2210
if ((used | wanted) & ~oissued & mine)
2211
goto out; /* we need these caps */
2212
2213
if (oissued) {
2214
/* we aren't the only cap.. just remove us */
2215
ceph_remove_cap(mdsc, cap, true);
2216
(*remaining)--;
2217
} else {
2218
struct dentry *dentry;
2219
/* try dropping referring dentries */
2220
spin_unlock(&ci->i_ceph_lock);
2221
dentry = d_find_any_alias(inode);
2222
if (dentry && drop_negative_children(dentry)) {
2223
int count;
2224
dput(dentry);
2225
d_prune_aliases(inode);
2226
count = icount_read(inode);
2227
if (count == 1)
2228
(*remaining)--;
2229
doutc(cl, "%p %llx.%llx cap %p pruned, count now %d\n",
2230
inode, ceph_vinop(inode), cap, count);
2231
} else {
2232
dput(dentry);
2233
}
2234
return 0;
2235
}
2236
2237
out:
2238
spin_unlock(&ci->i_ceph_lock);
2239
return 0;
2240
}
2241
2242
/*
2243
* Trim session cap count down to some max number.
2244
*/
2245
int ceph_trim_caps(struct ceph_mds_client *mdsc,
2246
struct ceph_mds_session *session,
2247
int max_caps)
2248
{
2249
struct ceph_client *cl = mdsc->fsc->client;
2250
int trim_caps = session->s_nr_caps - max_caps;
2251
2252
doutc(cl, "mds%d start: %d / %d, trim %d\n", session->s_mds,
2253
session->s_nr_caps, max_caps, trim_caps);
2254
if (trim_caps > 0) {
2255
int remaining = trim_caps;
2256
2257
ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2258
doutc(cl, "mds%d done: %d / %d, trimmed %d\n",
2259
session->s_mds, session->s_nr_caps, max_caps,
2260
trim_caps - remaining);
2261
}
2262
2263
ceph_flush_session_cap_releases(mdsc, session);
2264
return 0;
2265
}
2266
2267
static int check_caps_flush(struct ceph_mds_client *mdsc,
2268
u64 want_flush_tid)
2269
{
2270
struct ceph_client *cl = mdsc->fsc->client;
2271
int ret = 1;
2272
2273
spin_lock(&mdsc->cap_dirty_lock);
2274
if (!list_empty(&mdsc->cap_flush_list)) {
2275
struct ceph_cap_flush *cf =
2276
list_first_entry(&mdsc->cap_flush_list,
2277
struct ceph_cap_flush, g_list);
2278
if (cf->tid <= want_flush_tid) {
2279
doutc(cl, "still flushing tid %llu <= %llu\n",
2280
cf->tid, want_flush_tid);
2281
ret = 0;
2282
}
2283
}
2284
spin_unlock(&mdsc->cap_dirty_lock);
2285
return ret;
2286
}
2287
2288
/*
2289
* flush all dirty inode data to disk.
2290
*
2291
* returns true if we've flushed through want_flush_tid
2292
*/
2293
static void wait_caps_flush(struct ceph_mds_client *mdsc,
2294
u64 want_flush_tid)
2295
{
2296
struct ceph_client *cl = mdsc->fsc->client;
2297
2298
doutc(cl, "want %llu\n", want_flush_tid);
2299
2300
wait_event(mdsc->cap_flushing_wq,
2301
check_caps_flush(mdsc, want_flush_tid));
2302
2303
doutc(cl, "ok, flushed thru %llu\n", want_flush_tid);
2304
}
2305
2306
/*
2307
* called under s_mutex
2308
*/
2309
static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2310
struct ceph_mds_session *session)
2311
{
2312
struct ceph_client *cl = mdsc->fsc->client;
2313
struct ceph_msg *msg = NULL;
2314
struct ceph_mds_cap_release *head;
2315
struct ceph_mds_cap_item *item;
2316
struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2317
struct ceph_cap *cap;
2318
LIST_HEAD(tmp_list);
2319
int num_cap_releases;
2320
__le32 barrier, *cap_barrier;
2321
2322
down_read(&osdc->lock);
2323
barrier = cpu_to_le32(osdc->epoch_barrier);
2324
up_read(&osdc->lock);
2325
2326
spin_lock(&session->s_cap_lock);
2327
again:
2328
list_splice_init(&session->s_cap_releases, &tmp_list);
2329
num_cap_releases = session->s_num_cap_releases;
2330
session->s_num_cap_releases = 0;
2331
spin_unlock(&session->s_cap_lock);
2332
2333
while (!list_empty(&tmp_list)) {
2334
if (!msg) {
2335
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2336
PAGE_SIZE, GFP_NOFS, false);
2337
if (!msg)
2338
goto out_err;
2339
head = msg->front.iov_base;
2340
head->num = cpu_to_le32(0);
2341
msg->front.iov_len = sizeof(*head);
2342
2343
msg->hdr.version = cpu_to_le16(2);
2344
msg->hdr.compat_version = cpu_to_le16(1);
2345
}
2346
2347
cap = list_first_entry(&tmp_list, struct ceph_cap,
2348
session_caps);
2349
list_del(&cap->session_caps);
2350
num_cap_releases--;
2351
2352
head = msg->front.iov_base;
2353
put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2354
&head->num);
2355
item = msg->front.iov_base + msg->front.iov_len;
2356
item->ino = cpu_to_le64(cap->cap_ino);
2357
item->cap_id = cpu_to_le64(cap->cap_id);
2358
item->migrate_seq = cpu_to_le32(cap->mseq);
2359
item->issue_seq = cpu_to_le32(cap->issue_seq);
2360
msg->front.iov_len += sizeof(*item);
2361
2362
ceph_put_cap(mdsc, cap);
2363
2364
if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2365
// Append cap_barrier field
2366
cap_barrier = msg->front.iov_base + msg->front.iov_len;
2367
*cap_barrier = barrier;
2368
msg->front.iov_len += sizeof(*cap_barrier);
2369
2370
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2371
doutc(cl, "mds%d %p\n", session->s_mds, msg);
2372
ceph_con_send(&session->s_con, msg);
2373
msg = NULL;
2374
}
2375
}
2376
2377
BUG_ON(num_cap_releases != 0);
2378
2379
spin_lock(&session->s_cap_lock);
2380
if (!list_empty(&session->s_cap_releases))
2381
goto again;
2382
spin_unlock(&session->s_cap_lock);
2383
2384
if (msg) {
2385
// Append cap_barrier field
2386
cap_barrier = msg->front.iov_base + msg->front.iov_len;
2387
*cap_barrier = barrier;
2388
msg->front.iov_len += sizeof(*cap_barrier);
2389
2390
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2391
doutc(cl, "mds%d %p\n", session->s_mds, msg);
2392
ceph_con_send(&session->s_con, msg);
2393
}
2394
return;
2395
out_err:
2396
pr_err_client(cl, "mds%d, failed to allocate message\n",
2397
session->s_mds);
2398
spin_lock(&session->s_cap_lock);
2399
list_splice(&tmp_list, &session->s_cap_releases);
2400
session->s_num_cap_releases += num_cap_releases;
2401
spin_unlock(&session->s_cap_lock);
2402
}
2403
2404
static void ceph_cap_release_work(struct work_struct *work)
2405
{
2406
struct ceph_mds_session *session =
2407
container_of(work, struct ceph_mds_session, s_cap_release_work);
2408
2409
mutex_lock(&session->s_mutex);
2410
if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2411
session->s_state == CEPH_MDS_SESSION_HUNG)
2412
ceph_send_cap_releases(session->s_mdsc, session);
2413
mutex_unlock(&session->s_mutex);
2414
ceph_put_mds_session(session);
2415
}
2416
2417
void ceph_flush_session_cap_releases(struct ceph_mds_client *mdsc,
2418
struct ceph_mds_session *session)
2419
{
2420
struct ceph_client *cl = mdsc->fsc->client;
2421
if (mdsc->stopping)
2422
return;
2423
2424
ceph_get_mds_session(session);
2425
if (queue_work(mdsc->fsc->cap_wq,
2426
&session->s_cap_release_work)) {
2427
doutc(cl, "cap release work queued\n");
2428
} else {
2429
ceph_put_mds_session(session);
2430
doutc(cl, "failed to queue cap release work\n");
2431
}
2432
}
2433
2434
/*
2435
* caller holds session->s_cap_lock
2436
*/
2437
void __ceph_queue_cap_release(struct ceph_mds_session *session,
2438
struct ceph_cap *cap)
2439
{
2440
list_add_tail(&cap->session_caps, &session->s_cap_releases);
2441
session->s_num_cap_releases++;
2442
2443
if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2444
ceph_flush_session_cap_releases(session->s_mdsc, session);
2445
}
2446
2447
static void ceph_cap_reclaim_work(struct work_struct *work)
2448
{
2449
struct ceph_mds_client *mdsc =
2450
container_of(work, struct ceph_mds_client, cap_reclaim_work);
2451
int ret = ceph_trim_dentries(mdsc);
2452
if (ret == -EAGAIN)
2453
ceph_queue_cap_reclaim_work(mdsc);
2454
}
2455
2456
void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2457
{
2458
struct ceph_client *cl = mdsc->fsc->client;
2459
if (mdsc->stopping)
2460
return;
2461
2462
if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2463
doutc(cl, "caps reclaim work queued\n");
2464
} else {
2465
doutc(cl, "failed to queue caps release work\n");
2466
}
2467
}
2468
2469
void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2470
{
2471
int val;
2472
if (!nr)
2473
return;
2474
val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2475
if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2476
atomic_set(&mdsc->cap_reclaim_pending, 0);
2477
ceph_queue_cap_reclaim_work(mdsc);
2478
}
2479
}
2480
2481
void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc)
2482
{
2483
struct ceph_client *cl = mdsc->fsc->client;
2484
if (mdsc->stopping)
2485
return;
2486
2487
if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_unlink_work)) {
2488
doutc(cl, "caps unlink work queued\n");
2489
} else {
2490
doutc(cl, "failed to queue caps unlink work\n");
2491
}
2492
}
2493
2494
static void ceph_cap_unlink_work(struct work_struct *work)
2495
{
2496
struct ceph_mds_client *mdsc =
2497
container_of(work, struct ceph_mds_client, cap_unlink_work);
2498
struct ceph_client *cl = mdsc->fsc->client;
2499
2500
doutc(cl, "begin\n");
2501
spin_lock(&mdsc->cap_delay_lock);
2502
while (!list_empty(&mdsc->cap_unlink_delay_list)) {
2503
struct ceph_inode_info *ci;
2504
struct inode *inode;
2505
2506
ci = list_first_entry(&mdsc->cap_unlink_delay_list,
2507
struct ceph_inode_info,
2508
i_cap_delay_list);
2509
list_del_init(&ci->i_cap_delay_list);
2510
2511
inode = igrab(&ci->netfs.inode);
2512
if (inode) {
2513
spin_unlock(&mdsc->cap_delay_lock);
2514
doutc(cl, "on %p %llx.%llx\n", inode,
2515
ceph_vinop(inode));
2516
ceph_check_caps(ci, CHECK_CAPS_FLUSH);
2517
iput(inode);
2518
spin_lock(&mdsc->cap_delay_lock);
2519
}
2520
}
2521
spin_unlock(&mdsc->cap_delay_lock);
2522
doutc(cl, "done\n");
2523
}
2524
2525
/*
2526
* requests
2527
*/
2528
2529
int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2530
struct inode *dir)
2531
{
2532
struct ceph_inode_info *ci = ceph_inode(dir);
2533
struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2534
struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2535
size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2536
unsigned int num_entries;
2537
u64 bytes_count;
2538
int order;
2539
2540
spin_lock(&ci->i_ceph_lock);
2541
num_entries = ci->i_files + ci->i_subdirs;
2542
spin_unlock(&ci->i_ceph_lock);
2543
num_entries = max(num_entries, 1U);
2544
num_entries = min(num_entries, opt->max_readdir);
2545
2546
bytes_count = (u64)size * num_entries;
2547
if (unlikely(bytes_count > ULONG_MAX))
2548
bytes_count = ULONG_MAX;
2549
2550
order = get_order((unsigned long)bytes_count);
2551
while (order >= 0) {
2552
rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2553
__GFP_NOWARN |
2554
__GFP_ZERO,
2555
order);
2556
if (rinfo->dir_entries)
2557
break;
2558
order--;
2559
}
2560
if (!rinfo->dir_entries || unlikely(order < 0))
2561
return -ENOMEM;
2562
2563
num_entries = (PAGE_SIZE << order) / size;
2564
num_entries = min(num_entries, opt->max_readdir);
2565
2566
rinfo->dir_buf_size = PAGE_SIZE << order;
2567
req->r_num_caps = num_entries + 1;
2568
req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2569
req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2570
return 0;
2571
}
2572
2573
/*
2574
* Create an mds request.
2575
*/
2576
struct ceph_mds_request *
2577
ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2578
{
2579
struct ceph_mds_request *req;
2580
2581
req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2582
if (!req)
2583
return ERR_PTR(-ENOMEM);
2584
2585
mutex_init(&req->r_fill_mutex);
2586
req->r_mdsc = mdsc;
2587
req->r_started = jiffies;
2588
req->r_start_latency = ktime_get();
2589
req->r_resend_mds = -1;
2590
INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2591
INIT_LIST_HEAD(&req->r_unsafe_target_item);
2592
req->r_fmode = -1;
2593
req->r_feature_needed = -1;
2594
kref_init(&req->r_kref);
2595
RB_CLEAR_NODE(&req->r_node);
2596
INIT_LIST_HEAD(&req->r_wait);
2597
init_completion(&req->r_completion);
2598
init_completion(&req->r_safe_completion);
2599
INIT_LIST_HEAD(&req->r_unsafe_item);
2600
2601
ktime_get_coarse_real_ts64(&req->r_stamp);
2602
2603
req->r_op = op;
2604
req->r_direct_mode = mode;
2605
return req;
2606
}
2607
2608
/*
2609
* return oldest (lowest) request, tid in request tree, 0 if none.
2610
*
2611
* called under mdsc->mutex.
2612
*/
2613
static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2614
{
2615
if (RB_EMPTY_ROOT(&mdsc->request_tree))
2616
return NULL;
2617
return rb_entry(rb_first(&mdsc->request_tree),
2618
struct ceph_mds_request, r_node);
2619
}
2620
2621
static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2622
{
2623
return mdsc->oldest_tid;
2624
}
2625
2626
#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
2627
static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
2628
{
2629
struct inode *dir = req->r_parent;
2630
struct dentry *dentry = req->r_dentry;
2631
const struct qstr *name = req->r_dname;
2632
u8 *cryptbuf = NULL;
2633
u32 len = 0;
2634
int ret = 0;
2635
2636
/* only encode if we have parent and dentry */
2637
if (!dir || !dentry)
2638
goto success;
2639
2640
/* No-op unless this is encrypted */
2641
if (!IS_ENCRYPTED(dir))
2642
goto success;
2643
2644
ret = ceph_fscrypt_prepare_readdir(dir);
2645
if (ret < 0)
2646
return ERR_PTR(ret);
2647
2648
/* No key? Just ignore it. */
2649
if (!fscrypt_has_encryption_key(dir))
2650
goto success;
2651
2652
if (!name)
2653
name = &dentry->d_name;
2654
2655
if (!fscrypt_fname_encrypted_size(dir, name->len, NAME_MAX, &len)) {
2656
WARN_ON_ONCE(1);
2657
return ERR_PTR(-ENAMETOOLONG);
2658
}
2659
2660
/* No need to append altname if name is short enough */
2661
if (len <= CEPH_NOHASH_NAME_MAX) {
2662
len = 0;
2663
goto success;
2664
}
2665
2666
cryptbuf = kmalloc(len, GFP_KERNEL);
2667
if (!cryptbuf)
2668
return ERR_PTR(-ENOMEM);
2669
2670
ret = fscrypt_fname_encrypt(dir, name, cryptbuf, len);
2671
if (ret) {
2672
kfree(cryptbuf);
2673
return ERR_PTR(ret);
2674
}
2675
success:
2676
*plen = len;
2677
return cryptbuf;
2678
}
2679
#else
2680
static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
2681
{
2682
*plen = 0;
2683
return NULL;
2684
}
2685
#endif
2686
2687
/**
2688
* ceph_mdsc_build_path - build a path string to a given dentry
2689
* @mdsc: mds client
2690
* @dentry: dentry to which path should be built
2691
* @path_info: output path, length, base ino+snap, and freepath ownership flag
2692
* @for_wire: is this path going to be sent to the MDS?
2693
*
2694
* Build a string that represents the path to the dentry. This is mostly called
2695
* for two different purposes:
2696
*
2697
* 1) we need to build a path string to send to the MDS (for_wire == true)
2698
* 2) we need a path string for local presentation (e.g. debugfs)
2699
* (for_wire == false)
2700
*
2701
* The path is built in reverse, starting with the dentry. Walk back up toward
2702
* the root, building the path until the first non-snapped inode is reached
2703
* (for_wire) or the root inode is reached (!for_wire).
2704
*
2705
* Encode hidden .snap dirs as a double /, i.e.
2706
* foo/.snap/bar -> foo//bar
2707
*/
2708
char *ceph_mdsc_build_path(struct ceph_mds_client *mdsc, struct dentry *dentry,
2709
struct ceph_path_info *path_info, int for_wire)
2710
{
2711
struct ceph_client *cl = mdsc->fsc->client;
2712
struct dentry *cur;
2713
struct inode *inode;
2714
char *path;
2715
int pos;
2716
unsigned seq;
2717
u64 base;
2718
2719
if (!dentry)
2720
return ERR_PTR(-EINVAL);
2721
2722
path = __getname();
2723
if (!path)
2724
return ERR_PTR(-ENOMEM);
2725
retry:
2726
pos = PATH_MAX - 1;
2727
path[pos] = '\0';
2728
2729
seq = read_seqbegin(&rename_lock);
2730
cur = dget(dentry);
2731
for (;;) {
2732
struct dentry *parent;
2733
2734
spin_lock(&cur->d_lock);
2735
inode = d_inode(cur);
2736
if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2737
doutc(cl, "path+%d: %p SNAPDIR\n", pos, cur);
2738
spin_unlock(&cur->d_lock);
2739
parent = dget_parent(cur);
2740
} else if (for_wire && inode && dentry != cur &&
2741
ceph_snap(inode) == CEPH_NOSNAP) {
2742
spin_unlock(&cur->d_lock);
2743
pos++; /* get rid of any prepended '/' */
2744
break;
2745
} else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) {
2746
pos -= cur->d_name.len;
2747
if (pos < 0) {
2748
spin_unlock(&cur->d_lock);
2749
break;
2750
}
2751
memcpy(path + pos, cur->d_name.name, cur->d_name.len);
2752
spin_unlock(&cur->d_lock);
2753
parent = dget_parent(cur);
2754
} else {
2755
int len, ret;
2756
char buf[NAME_MAX];
2757
2758
/*
2759
* Proactively copy name into buf, in case we need to
2760
* present it as-is.
2761
*/
2762
memcpy(buf, cur->d_name.name, cur->d_name.len);
2763
len = cur->d_name.len;
2764
spin_unlock(&cur->d_lock);
2765
parent = dget_parent(cur);
2766
2767
ret = ceph_fscrypt_prepare_readdir(d_inode(parent));
2768
if (ret < 0) {
2769
dput(parent);
2770
dput(cur);
2771
return ERR_PTR(ret);
2772
}
2773
2774
if (fscrypt_has_encryption_key(d_inode(parent))) {
2775
len = ceph_encode_encrypted_dname(d_inode(parent),
2776
buf, len);
2777
if (len < 0) {
2778
dput(parent);
2779
dput(cur);
2780
return ERR_PTR(len);
2781
}
2782
}
2783
pos -= len;
2784
if (pos < 0) {
2785
dput(parent);
2786
break;
2787
}
2788
memcpy(path + pos, buf, len);
2789
}
2790
dput(cur);
2791
cur = parent;
2792
2793
/* Are we at the root? */
2794
if (IS_ROOT(cur))
2795
break;
2796
2797
/* Are we out of buffer? */
2798
if (--pos < 0)
2799
break;
2800
2801
path[pos] = '/';
2802
}
2803
inode = d_inode(cur);
2804
base = inode ? ceph_ino(inode) : 0;
2805
dput(cur);
2806
2807
if (read_seqretry(&rename_lock, seq))
2808
goto retry;
2809
2810
if (pos < 0) {
2811
/*
2812
* The path is longer than PATH_MAX and this function
2813
* cannot ever succeed. Creating paths that long is
2814
* possible with Ceph, but Linux cannot use them.
2815
*/
2816
return ERR_PTR(-ENAMETOOLONG);
2817
}
2818
2819
/* Initialize the output structure */
2820
memset(path_info, 0, sizeof(*path_info));
2821
2822
path_info->vino.ino = base;
2823
path_info->pathlen = PATH_MAX - 1 - pos;
2824
path_info->path = path + pos;
2825
path_info->freepath = true;
2826
2827
/* Set snap from dentry if available */
2828
if (d_inode(dentry))
2829
path_info->vino.snap = ceph_snap(d_inode(dentry));
2830
else
2831
path_info->vino.snap = CEPH_NOSNAP;
2832
2833
doutc(cl, "on %p %d built %llx '%.*s'\n", dentry, d_count(dentry),
2834
base, PATH_MAX - 1 - pos, path + pos);
2835
return path + pos;
2836
}
2837
2838
static int build_dentry_path(struct ceph_mds_client *mdsc, struct dentry *dentry,
2839
struct inode *dir, struct ceph_path_info *path_info,
2840
bool parent_locked)
2841
{
2842
char *path;
2843
2844
rcu_read_lock();
2845
if (!dir)
2846
dir = d_inode_rcu(dentry->d_parent);
2847
if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP &&
2848
!IS_ENCRYPTED(dir)) {
2849
path_info->vino.ino = ceph_ino(dir);
2850
path_info->vino.snap = ceph_snap(dir);
2851
rcu_read_unlock();
2852
path_info->path = dentry->d_name.name;
2853
path_info->pathlen = dentry->d_name.len;
2854
path_info->freepath = false;
2855
return 0;
2856
}
2857
rcu_read_unlock();
2858
path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1);
2859
if (IS_ERR(path))
2860
return PTR_ERR(path);
2861
/*
2862
* ceph_mdsc_build_path already fills path_info, including snap handling.
2863
*/
2864
return 0;
2865
}
2866
2867
static int build_inode_path(struct inode *inode, struct ceph_path_info *path_info)
2868
{
2869
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
2870
struct dentry *dentry;
2871
char *path;
2872
2873
if (ceph_snap(inode) == CEPH_NOSNAP) {
2874
path_info->vino.ino = ceph_ino(inode);
2875
path_info->vino.snap = ceph_snap(inode);
2876
path_info->pathlen = 0;
2877
path_info->freepath = false;
2878
return 0;
2879
}
2880
dentry = d_find_alias(inode);
2881
path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1);
2882
dput(dentry);
2883
if (IS_ERR(path))
2884
return PTR_ERR(path);
2885
/*
2886
* ceph_mdsc_build_path already fills path_info, including snap from dentry.
2887
* Override with inode's snap since that's what this function is for.
2888
*/
2889
path_info->vino.snap = ceph_snap(inode);
2890
return 0;
2891
}
2892
2893
/*
2894
* request arguments may be specified via an inode *, a dentry *, or
2895
* an explicit ino+path.
2896
*/
2897
static int set_request_path_attr(struct ceph_mds_client *mdsc, struct inode *rinode,
2898
struct dentry *rdentry, struct inode *rdiri,
2899
const char *rpath, u64 rino,
2900
struct ceph_path_info *path_info,
2901
bool parent_locked)
2902
{
2903
struct ceph_client *cl = mdsc->fsc->client;
2904
int r = 0;
2905
2906
/* Initialize the output structure */
2907
memset(path_info, 0, sizeof(*path_info));
2908
2909
if (rinode) {
2910
r = build_inode_path(rinode, path_info);
2911
doutc(cl, " inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2912
ceph_snap(rinode));
2913
} else if (rdentry) {
2914
r = build_dentry_path(mdsc, rdentry, rdiri, path_info, parent_locked);
2915
doutc(cl, " dentry %p %llx/%.*s\n", rdentry, path_info->vino.ino,
2916
path_info->pathlen, path_info->path);
2917
} else if (rpath || rino) {
2918
path_info->vino.ino = rino;
2919
path_info->vino.snap = CEPH_NOSNAP;
2920
path_info->path = rpath;
2921
path_info->pathlen = rpath ? strlen(rpath) : 0;
2922
path_info->freepath = false;
2923
2924
doutc(cl, " path %.*s\n", path_info->pathlen, rpath);
2925
}
2926
2927
return r;
2928
}
2929
2930
static void encode_mclientrequest_tail(void **p,
2931
const struct ceph_mds_request *req)
2932
{
2933
struct ceph_timespec ts;
2934
int i;
2935
2936
ceph_encode_timespec64(&ts, &req->r_stamp);
2937
ceph_encode_copy(p, &ts, sizeof(ts));
2938
2939
/* v4: gid_list */
2940
ceph_encode_32(p, req->r_cred->group_info->ngroups);
2941
for (i = 0; i < req->r_cred->group_info->ngroups; i++)
2942
ceph_encode_64(p, from_kgid(&init_user_ns,
2943
req->r_cred->group_info->gid[i]));
2944
2945
/* v5: altname */
2946
ceph_encode_32(p, req->r_altname_len);
2947
ceph_encode_copy(p, req->r_altname, req->r_altname_len);
2948
2949
/* v6: fscrypt_auth and fscrypt_file */
2950
if (req->r_fscrypt_auth) {
2951
u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth);
2952
2953
ceph_encode_32(p, authlen);
2954
ceph_encode_copy(p, req->r_fscrypt_auth, authlen);
2955
} else {
2956
ceph_encode_32(p, 0);
2957
}
2958
if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) {
2959
ceph_encode_32(p, sizeof(__le64));
2960
ceph_encode_64(p, req->r_fscrypt_file);
2961
} else {
2962
ceph_encode_32(p, 0);
2963
}
2964
}
2965
2966
static inline u16 mds_supported_head_version(struct ceph_mds_session *session)
2967
{
2968
if (!test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, &session->s_features))
2969
return 1;
2970
2971
if (!test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features))
2972
return 2;
2973
2974
return CEPH_MDS_REQUEST_HEAD_VERSION;
2975
}
2976
2977
static struct ceph_mds_request_head_legacy *
2978
find_legacy_request_head(void *p, u64 features)
2979
{
2980
bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
2981
struct ceph_mds_request_head *head;
2982
2983
if (legacy)
2984
return (struct ceph_mds_request_head_legacy *)p;
2985
head = (struct ceph_mds_request_head *)p;
2986
return (struct ceph_mds_request_head_legacy *)&head->oldest_client_tid;
2987
}
2988
2989
/*
2990
* called under mdsc->mutex
2991
*/
2992
static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
2993
struct ceph_mds_request *req,
2994
bool drop_cap_releases)
2995
{
2996
int mds = session->s_mds;
2997
struct ceph_mds_client *mdsc = session->s_mdsc;
2998
struct ceph_client *cl = mdsc->fsc->client;
2999
struct ceph_msg *msg;
3000
struct ceph_mds_request_head_legacy *lhead;
3001
struct ceph_path_info path_info1 = {0};
3002
struct ceph_path_info path_info2 = {0};
3003
struct dentry *old_dentry = NULL;
3004
int len;
3005
u16 releases;
3006
void *p, *end;
3007
int ret;
3008
bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
3009
u16 request_head_version = mds_supported_head_version(session);
3010
kuid_t caller_fsuid = req->r_cred->fsuid;
3011
kgid_t caller_fsgid = req->r_cred->fsgid;
3012
bool parent_locked = test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
3013
3014
ret = set_request_path_attr(mdsc, req->r_inode, req->r_dentry,
3015
req->r_parent, req->r_path1, req->r_ino1.ino,
3016
&path_info1, parent_locked);
3017
if (ret < 0) {
3018
msg = ERR_PTR(ret);
3019
goto out;
3020
}
3021
3022
/*
3023
* When the parent directory's i_rwsem is *not* locked, req->r_parent may
3024
* have become stale (e.g. after a concurrent rename) between the time the
3025
* dentry was looked up and now. If we detect that the stored r_parent
3026
* does not match the inode number we just encoded for the request, switch
3027
* to the correct inode so that the MDS receives a valid parent reference.
3028
*/
3029
if (!parent_locked && req->r_parent && path_info1.vino.ino &&
3030
ceph_ino(req->r_parent) != path_info1.vino.ino) {
3031
struct inode *old_parent = req->r_parent;
3032
struct inode *correct_dir = ceph_get_inode(mdsc->fsc->sb, path_info1.vino, NULL);
3033
if (!IS_ERR(correct_dir)) {
3034
WARN_ONCE(1, "ceph: r_parent mismatch (had %llx wanted %llx) - updating\n",
3035
ceph_ino(old_parent), path_info1.vino.ino);
3036
/*
3037
* Transfer CEPH_CAP_PIN from the old parent to the new one.
3038
* The pin was taken earlier in ceph_mdsc_submit_request().
3039
*/
3040
ceph_put_cap_refs(ceph_inode(old_parent), CEPH_CAP_PIN);
3041
iput(old_parent);
3042
req->r_parent = correct_dir;
3043
ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
3044
}
3045
}
3046
3047
/* If r_old_dentry is set, then assume that its parent is locked */
3048
if (req->r_old_dentry &&
3049
!(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED))
3050
old_dentry = req->r_old_dentry;
3051
ret = set_request_path_attr(mdsc, NULL, old_dentry,
3052
req->r_old_dentry_dir,
3053
req->r_path2, req->r_ino2.ino,
3054
&path_info2, true);
3055
if (ret < 0) {
3056
msg = ERR_PTR(ret);
3057
goto out_free1;
3058
}
3059
3060
req->r_altname = get_fscrypt_altname(req, &req->r_altname_len);
3061
if (IS_ERR(req->r_altname)) {
3062
msg = ERR_CAST(req->r_altname);
3063
req->r_altname = NULL;
3064
goto out_free2;
3065
}
3066
3067
/*
3068
* For old cephs without supporting the 32bit retry/fwd feature
3069
* it will copy the raw memories directly when decoding the
3070
* requests. While new cephs will decode the head depending the
3071
* version member, so we need to make sure it will be compatible
3072
* with them both.
3073
*/
3074
if (legacy)
3075
len = sizeof(struct ceph_mds_request_head_legacy);
3076
else if (request_head_version == 1)
3077
len = offsetofend(struct ceph_mds_request_head, args);
3078
else if (request_head_version == 2)
3079
len = offsetofend(struct ceph_mds_request_head, ext_num_fwd);
3080
else
3081
len = sizeof(struct ceph_mds_request_head);
3082
3083
/* filepaths */
3084
len += 2 * (1 + sizeof(u32) + sizeof(u64));
3085
len += path_info1.pathlen + path_info2.pathlen;
3086
3087
/* cap releases */
3088
len += sizeof(struct ceph_mds_request_release) *
3089
(!!req->r_inode_drop + !!req->r_dentry_drop +
3090
!!req->r_old_inode_drop + !!req->r_old_dentry_drop);
3091
3092
if (req->r_dentry_drop)
3093
len += path_info1.pathlen;
3094
if (req->r_old_dentry_drop)
3095
len += path_info2.pathlen;
3096
3097
/* MClientRequest tail */
3098
3099
/* req->r_stamp */
3100
len += sizeof(struct ceph_timespec);
3101
3102
/* gid list */
3103
len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
3104
3105
/* alternate name */
3106
len += sizeof(u32) + req->r_altname_len;
3107
3108
/* fscrypt_auth */
3109
len += sizeof(u32); // fscrypt_auth
3110
if (req->r_fscrypt_auth)
3111
len += ceph_fscrypt_auth_len(req->r_fscrypt_auth);
3112
3113
/* fscrypt_file */
3114
len += sizeof(u32);
3115
if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags))
3116
len += sizeof(__le64);
3117
3118
msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
3119
if (!msg) {
3120
msg = ERR_PTR(-ENOMEM);
3121
goto out_free2;
3122
}
3123
3124
msg->hdr.tid = cpu_to_le64(req->r_tid);
3125
3126
lhead = find_legacy_request_head(msg->front.iov_base,
3127
session->s_con.peer_features);
3128
3129
if ((req->r_mnt_idmap != &nop_mnt_idmap) &&
3130
!test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) {
3131
WARN_ON_ONCE(!IS_CEPH_MDS_OP_NEWINODE(req->r_op));
3132
3133
if (enable_unsafe_idmap) {
3134
pr_warn_once_client(cl,
3135
"idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID"
3136
" is not supported by MDS. UID/GID-based restrictions may"
3137
" not work properly.\n");
3138
3139
caller_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns,
3140
VFSUIDT_INIT(req->r_cred->fsuid));
3141
caller_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns,
3142
VFSGIDT_INIT(req->r_cred->fsgid));
3143
} else {
3144
pr_err_ratelimited_client(cl,
3145
"idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID"
3146
" is not supported by MDS. Fail request with -EIO.\n");
3147
3148
ret = -EIO;
3149
goto out_err;
3150
}
3151
}
3152
3153
/*
3154
* The ceph_mds_request_head_legacy didn't contain a version field, and
3155
* one was added when we moved the message version from 3->4.
3156
*/
3157
if (legacy) {
3158
msg->hdr.version = cpu_to_le16(3);
3159
p = msg->front.iov_base + sizeof(*lhead);
3160
} else if (request_head_version == 1) {
3161
struct ceph_mds_request_head *nhead = msg->front.iov_base;
3162
3163
msg->hdr.version = cpu_to_le16(4);
3164
nhead->version = cpu_to_le16(1);
3165
p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, args);
3166
} else if (request_head_version == 2) {
3167
struct ceph_mds_request_head *nhead = msg->front.iov_base;
3168
3169
msg->hdr.version = cpu_to_le16(6);
3170
nhead->version = cpu_to_le16(2);
3171
3172
p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, ext_num_fwd);
3173
} else {
3174
struct ceph_mds_request_head *nhead = msg->front.iov_base;
3175
kuid_t owner_fsuid;
3176
kgid_t owner_fsgid;
3177
3178
msg->hdr.version = cpu_to_le16(6);
3179
nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
3180
nhead->struct_len = cpu_to_le32(sizeof(struct ceph_mds_request_head));
3181
3182
if (IS_CEPH_MDS_OP_NEWINODE(req->r_op)) {
3183
owner_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns,
3184
VFSUIDT_INIT(req->r_cred->fsuid));
3185
owner_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns,
3186
VFSGIDT_INIT(req->r_cred->fsgid));
3187
nhead->owner_uid = cpu_to_le32(from_kuid(&init_user_ns, owner_fsuid));
3188
nhead->owner_gid = cpu_to_le32(from_kgid(&init_user_ns, owner_fsgid));
3189
} else {
3190
nhead->owner_uid = cpu_to_le32(-1);
3191
nhead->owner_gid = cpu_to_le32(-1);
3192
}
3193
3194
p = msg->front.iov_base + sizeof(*nhead);
3195
}
3196
3197
end = msg->front.iov_base + msg->front.iov_len;
3198
3199
lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
3200
lhead->op = cpu_to_le32(req->r_op);
3201
lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
3202
caller_fsuid));
3203
lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
3204
caller_fsgid));
3205
lhead->ino = cpu_to_le64(req->r_deleg_ino);
3206
lhead->args = req->r_args;
3207
3208
ceph_encode_filepath(&p, end, path_info1.vino.ino, path_info1.path);
3209
ceph_encode_filepath(&p, end, path_info2.vino.ino, path_info2.path);
3210
3211
/* make note of release offset, in case we need to replay */
3212
req->r_request_release_offset = p - msg->front.iov_base;
3213
3214
/* cap releases */
3215
releases = 0;
3216
if (req->r_inode_drop)
3217
releases += ceph_encode_inode_release(&p,
3218
req->r_inode ? req->r_inode : d_inode(req->r_dentry),
3219
mds, req->r_inode_drop, req->r_inode_unless,
3220
req->r_op == CEPH_MDS_OP_READDIR);
3221
if (req->r_dentry_drop) {
3222
ret = ceph_encode_dentry_release(&p, req->r_dentry,
3223
req->r_parent, mds, req->r_dentry_drop,
3224
req->r_dentry_unless);
3225
if (ret < 0)
3226
goto out_err;
3227
releases += ret;
3228
}
3229
if (req->r_old_dentry_drop) {
3230
ret = ceph_encode_dentry_release(&p, req->r_old_dentry,
3231
req->r_old_dentry_dir, mds,
3232
req->r_old_dentry_drop,
3233
req->r_old_dentry_unless);
3234
if (ret < 0)
3235
goto out_err;
3236
releases += ret;
3237
}
3238
if (req->r_old_inode_drop)
3239
releases += ceph_encode_inode_release(&p,
3240
d_inode(req->r_old_dentry),
3241
mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
3242
3243
if (drop_cap_releases) {
3244
releases = 0;
3245
p = msg->front.iov_base + req->r_request_release_offset;
3246
}
3247
3248
lhead->num_releases = cpu_to_le16(releases);
3249
3250
encode_mclientrequest_tail(&p, req);
3251
3252
if (WARN_ON_ONCE(p > end)) {
3253
ceph_msg_put(msg);
3254
msg = ERR_PTR(-ERANGE);
3255
goto out_free2;
3256
}
3257
3258
msg->front.iov_len = p - msg->front.iov_base;
3259
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
3260
3261
if (req->r_pagelist) {
3262
struct ceph_pagelist *pagelist = req->r_pagelist;
3263
ceph_msg_data_add_pagelist(msg, pagelist);
3264
msg->hdr.data_len = cpu_to_le32(pagelist->length);
3265
} else {
3266
msg->hdr.data_len = 0;
3267
}
3268
3269
msg->hdr.data_off = cpu_to_le16(0);
3270
3271
out_free2:
3272
ceph_mdsc_free_path_info(&path_info2);
3273
out_free1:
3274
ceph_mdsc_free_path_info(&path_info1);
3275
out:
3276
return msg;
3277
out_err:
3278
ceph_msg_put(msg);
3279
msg = ERR_PTR(ret);
3280
goto out_free2;
3281
}
3282
3283
/*
3284
* called under mdsc->mutex if error, under no mutex if
3285
* success.
3286
*/
3287
static void complete_request(struct ceph_mds_client *mdsc,
3288
struct ceph_mds_request *req)
3289
{
3290
req->r_end_latency = ktime_get();
3291
3292
trace_ceph_mdsc_complete_request(mdsc, req);
3293
3294
if (req->r_callback)
3295
req->r_callback(mdsc, req);
3296
complete_all(&req->r_completion);
3297
}
3298
3299
/*
3300
* called under mdsc->mutex
3301
*/
3302
static int __prepare_send_request(struct ceph_mds_session *session,
3303
struct ceph_mds_request *req,
3304
bool drop_cap_releases)
3305
{
3306
int mds = session->s_mds;
3307
struct ceph_mds_client *mdsc = session->s_mdsc;
3308
struct ceph_client *cl = mdsc->fsc->client;
3309
struct ceph_mds_request_head_legacy *lhead;
3310
struct ceph_mds_request_head *nhead;
3311
struct ceph_msg *msg;
3312
int flags = 0, old_max_retry;
3313
bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD,
3314
&session->s_features);
3315
3316
/*
3317
* Avoid infinite retrying after overflow. The client will
3318
* increase the retry count and if the MDS is old version,
3319
* so we limit to retry at most 256 times.
3320
*/
3321
if (req->r_attempts) {
3322
old_max_retry = sizeof_field(struct ceph_mds_request_head,
3323
num_retry);
3324
old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE);
3325
if ((old_version && req->r_attempts >= old_max_retry) ||
3326
((uint32_t)req->r_attempts >= U32_MAX)) {
3327
pr_warn_ratelimited_client(cl, "request tid %llu seq overflow\n",
3328
req->r_tid);
3329
return -EMULTIHOP;
3330
}
3331
}
3332
3333
req->r_attempts++;
3334
if (req->r_inode) {
3335
struct ceph_cap *cap =
3336
ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
3337
3338
if (cap)
3339
req->r_sent_on_mseq = cap->mseq;
3340
else
3341
req->r_sent_on_mseq = -1;
3342
}
3343
doutc(cl, "%p tid %lld %s (attempt %d)\n", req, req->r_tid,
3344
ceph_mds_op_name(req->r_op), req->r_attempts);
3345
3346
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3347
void *p;
3348
3349
/*
3350
* Replay. Do not regenerate message (and rebuild
3351
* paths, etc.); just use the original message.
3352
* Rebuilding paths will break for renames because
3353
* d_move mangles the src name.
3354
*/
3355
msg = req->r_request;
3356
lhead = find_legacy_request_head(msg->front.iov_base,
3357
session->s_con.peer_features);
3358
3359
flags = le32_to_cpu(lhead->flags);
3360
flags |= CEPH_MDS_FLAG_REPLAY;
3361
lhead->flags = cpu_to_le32(flags);
3362
3363
if (req->r_target_inode)
3364
lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
3365
3366
lhead->num_retry = req->r_attempts - 1;
3367
if (!old_version) {
3368
nhead = (struct ceph_mds_request_head*)msg->front.iov_base;
3369
nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1);
3370
}
3371
3372
/* remove cap/dentry releases from message */
3373
lhead->num_releases = 0;
3374
3375
p = msg->front.iov_base + req->r_request_release_offset;
3376
encode_mclientrequest_tail(&p, req);
3377
3378
msg->front.iov_len = p - msg->front.iov_base;
3379
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
3380
return 0;
3381
}
3382
3383
if (req->r_request) {
3384
ceph_msg_put(req->r_request);
3385
req->r_request = NULL;
3386
}
3387
msg = create_request_message(session, req, drop_cap_releases);
3388
if (IS_ERR(msg)) {
3389
req->r_err = PTR_ERR(msg);
3390
return PTR_ERR(msg);
3391
}
3392
req->r_request = msg;
3393
3394
lhead = find_legacy_request_head(msg->front.iov_base,
3395
session->s_con.peer_features);
3396
lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
3397
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3398
flags |= CEPH_MDS_FLAG_REPLAY;
3399
if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
3400
flags |= CEPH_MDS_FLAG_ASYNC;
3401
if (req->r_parent)
3402
flags |= CEPH_MDS_FLAG_WANT_DENTRY;
3403
lhead->flags = cpu_to_le32(flags);
3404
lhead->num_fwd = req->r_num_fwd;
3405
lhead->num_retry = req->r_attempts - 1;
3406
if (!old_version) {
3407
nhead = (struct ceph_mds_request_head*)msg->front.iov_base;
3408
nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd);
3409
nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1);
3410
}
3411
3412
doutc(cl, " r_parent = %p\n", req->r_parent);
3413
return 0;
3414
}
3415
3416
/*
3417
* called under mdsc->mutex
3418
*/
3419
static int __send_request(struct ceph_mds_session *session,
3420
struct ceph_mds_request *req,
3421
bool drop_cap_releases)
3422
{
3423
int err;
3424
3425
trace_ceph_mdsc_send_request(session, req);
3426
3427
err = __prepare_send_request(session, req, drop_cap_releases);
3428
if (!err) {
3429
ceph_msg_get(req->r_request);
3430
ceph_con_send(&session->s_con, req->r_request);
3431
}
3432
3433
return err;
3434
}
3435
3436
/*
3437
* send request, or put it on the appropriate wait list.
3438
*/
3439
static void __do_request(struct ceph_mds_client *mdsc,
3440
struct ceph_mds_request *req)
3441
{
3442
struct ceph_client *cl = mdsc->fsc->client;
3443
struct ceph_mds_session *session = NULL;
3444
int mds = -1;
3445
int err = 0;
3446
bool random;
3447
3448
if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3449
if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
3450
__unregister_request(mdsc, req);
3451
return;
3452
}
3453
3454
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) {
3455
doutc(cl, "metadata corrupted\n");
3456
err = -EIO;
3457
goto finish;
3458
}
3459
if (req->r_timeout &&
3460
time_after_eq(jiffies, req->r_started + req->r_timeout)) {
3461
doutc(cl, "timed out\n");
3462
err = -ETIMEDOUT;
3463
goto finish;
3464
}
3465
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
3466
doutc(cl, "forced umount\n");
3467
err = -EIO;
3468
goto finish;
3469
}
3470
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
3471
if (mdsc->mdsmap_err) {
3472
err = mdsc->mdsmap_err;
3473
doutc(cl, "mdsmap err %d\n", err);
3474
goto finish;
3475
}
3476
if (mdsc->mdsmap->m_epoch == 0) {
3477
doutc(cl, "no mdsmap, waiting for map\n");
3478
trace_ceph_mdsc_suspend_request(mdsc, session, req,
3479
ceph_mdsc_suspend_reason_no_mdsmap);
3480
list_add(&req->r_wait, &mdsc->waiting_for_map);
3481
return;
3482
}
3483
if (!(mdsc->fsc->mount_options->flags &
3484
CEPH_MOUNT_OPT_MOUNTWAIT) &&
3485
!ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
3486
err = -EHOSTUNREACH;
3487
goto finish;
3488
}
3489
}
3490
3491
put_request_session(req);
3492
3493
mds = __choose_mds(mdsc, req, &random);
3494
if (mds < 0 ||
3495
ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
3496
if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
3497
err = -EJUKEBOX;
3498
goto finish;
3499
}
3500
doutc(cl, "no mds or not active, waiting for map\n");
3501
trace_ceph_mdsc_suspend_request(mdsc, session, req,
3502
ceph_mdsc_suspend_reason_no_active_mds);
3503
list_add(&req->r_wait, &mdsc->waiting_for_map);
3504
return;
3505
}
3506
3507
/* get, open session */
3508
session = __ceph_lookup_mds_session(mdsc, mds);
3509
if (!session) {
3510
session = register_session(mdsc, mds);
3511
if (IS_ERR(session)) {
3512
err = PTR_ERR(session);
3513
goto finish;
3514
}
3515
}
3516
req->r_session = ceph_get_mds_session(session);
3517
3518
doutc(cl, "mds%d session %p state %s\n", mds, session,
3519
ceph_session_state_name(session->s_state));
3520
3521
/*
3522
* The old ceph will crash the MDSs when see unknown OPs
3523
*/
3524
if (req->r_feature_needed > 0 &&
3525
!test_bit(req->r_feature_needed, &session->s_features)) {
3526
err = -EOPNOTSUPP;
3527
goto out_session;
3528
}
3529
3530
if (session->s_state != CEPH_MDS_SESSION_OPEN &&
3531
session->s_state != CEPH_MDS_SESSION_HUNG) {
3532
/*
3533
* We cannot queue async requests since the caps and delegated
3534
* inodes are bound to the session. Just return -EJUKEBOX and
3535
* let the caller retry a sync request in that case.
3536
*/
3537
if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
3538
err = -EJUKEBOX;
3539
goto out_session;
3540
}
3541
3542
/*
3543
* If the session has been REJECTED, then return a hard error,
3544
* unless it's a CLEANRECOVER mount, in which case we'll queue
3545
* it to the mdsc queue.
3546
*/
3547
if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
3548
if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) {
3549
trace_ceph_mdsc_suspend_request(mdsc, session, req,
3550
ceph_mdsc_suspend_reason_rejected);
3551
list_add(&req->r_wait, &mdsc->waiting_for_map);
3552
} else
3553
err = -EACCES;
3554
goto out_session;
3555
}
3556
3557
if (session->s_state == CEPH_MDS_SESSION_NEW ||
3558
session->s_state == CEPH_MDS_SESSION_CLOSING) {
3559
err = __open_session(mdsc, session);
3560
if (err)
3561
goto out_session;
3562
/* retry the same mds later */
3563
if (random)
3564
req->r_resend_mds = mds;
3565
}
3566
trace_ceph_mdsc_suspend_request(mdsc, session, req,
3567
ceph_mdsc_suspend_reason_session);
3568
list_add(&req->r_wait, &session->s_waiting);
3569
goto out_session;
3570
}
3571
3572
/* send request */
3573
req->r_resend_mds = -1; /* forget any previous mds hint */
3574
3575
if (req->r_request_started == 0) /* note request start time */
3576
req->r_request_started = jiffies;
3577
3578
/*
3579
* For async create we will choose the auth MDS of frag in parent
3580
* directory to send the request and usually this works fine, but
3581
* if the migrated the dirtory to another MDS before it could handle
3582
* it the request will be forwarded.
3583
*
3584
* And then the auth cap will be changed.
3585
*/
3586
if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) {
3587
struct ceph_dentry_info *di = ceph_dentry(req->r_dentry);
3588
struct ceph_inode_info *ci;
3589
struct ceph_cap *cap;
3590
3591
/*
3592
* The request maybe handled very fast and the new inode
3593
* hasn't been linked to the dentry yet. We need to wait
3594
* for the ceph_finish_async_create(), which shouldn't be
3595
* stuck too long or fail in thoery, to finish when forwarding
3596
* the request.
3597
*/
3598
if (!d_inode(req->r_dentry)) {
3599
err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT,
3600
TASK_KILLABLE);
3601
if (err) {
3602
mutex_lock(&req->r_fill_mutex);
3603
set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3604
mutex_unlock(&req->r_fill_mutex);
3605
goto out_session;
3606
}
3607
}
3608
3609
ci = ceph_inode(d_inode(req->r_dentry));
3610
3611
spin_lock(&ci->i_ceph_lock);
3612
cap = ci->i_auth_cap;
3613
if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) {
3614
doutc(cl, "session changed for auth cap %d -> %d\n",
3615
cap->session->s_mds, session->s_mds);
3616
3617
/* Remove the auth cap from old session */
3618
spin_lock(&cap->session->s_cap_lock);
3619
cap->session->s_nr_caps--;
3620
list_del_init(&cap->session_caps);
3621
spin_unlock(&cap->session->s_cap_lock);
3622
3623
/* Add the auth cap to the new session */
3624
cap->mds = mds;
3625
cap->session = session;
3626
spin_lock(&session->s_cap_lock);
3627
session->s_nr_caps++;
3628
list_add_tail(&cap->session_caps, &session->s_caps);
3629
spin_unlock(&session->s_cap_lock);
3630
3631
change_auth_cap_ses(ci, session);
3632
}
3633
spin_unlock(&ci->i_ceph_lock);
3634
}
3635
3636
err = __send_request(session, req, false);
3637
3638
out_session:
3639
ceph_put_mds_session(session);
3640
finish:
3641
if (err) {
3642
doutc(cl, "early error %d\n", err);
3643
req->r_err = err;
3644
complete_request(mdsc, req);
3645
__unregister_request(mdsc, req);
3646
}
3647
return;
3648
}
3649
3650
/*
3651
* called under mdsc->mutex
3652
*/
3653
static void __wake_requests(struct ceph_mds_client *mdsc,
3654
struct list_head *head)
3655
{
3656
struct ceph_client *cl = mdsc->fsc->client;
3657
struct ceph_mds_request *req;
3658
LIST_HEAD(tmp_list);
3659
3660
list_splice_init(head, &tmp_list);
3661
3662
while (!list_empty(&tmp_list)) {
3663
req = list_entry(tmp_list.next,
3664
struct ceph_mds_request, r_wait);
3665
list_del_init(&req->r_wait);
3666
doutc(cl, " wake request %p tid %llu\n", req,
3667
req->r_tid);
3668
trace_ceph_mdsc_resume_request(mdsc, req);
3669
__do_request(mdsc, req);
3670
}
3671
}
3672
3673
/*
3674
* Wake up threads with requests pending for @mds, so that they can
3675
* resubmit their requests to a possibly different mds.
3676
*/
3677
static void kick_requests(struct ceph_mds_client *mdsc, int mds)
3678
{
3679
struct ceph_client *cl = mdsc->fsc->client;
3680
struct ceph_mds_request *req;
3681
struct rb_node *p = rb_first(&mdsc->request_tree);
3682
3683
doutc(cl, "kick_requests mds%d\n", mds);
3684
while (p) {
3685
req = rb_entry(p, struct ceph_mds_request, r_node);
3686
p = rb_next(p);
3687
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3688
continue;
3689
if (req->r_attempts > 0)
3690
continue; /* only new requests */
3691
if (req->r_session &&
3692
req->r_session->s_mds == mds) {
3693
doutc(cl, " kicking tid %llu\n", req->r_tid);
3694
list_del_init(&req->r_wait);
3695
trace_ceph_mdsc_resume_request(mdsc, req);
3696
__do_request(mdsc, req);
3697
}
3698
}
3699
}
3700
3701
int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
3702
struct ceph_mds_request *req)
3703
{
3704
struct ceph_client *cl = mdsc->fsc->client;
3705
int err = 0;
3706
3707
/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
3708
if (req->r_inode)
3709
ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
3710
if (req->r_parent) {
3711
struct ceph_inode_info *ci = ceph_inode(req->r_parent);
3712
int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
3713
CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
3714
spin_lock(&ci->i_ceph_lock);
3715
ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
3716
__ceph_touch_fmode(ci, mdsc, fmode);
3717
spin_unlock(&ci->i_ceph_lock);
3718
}
3719
if (req->r_old_dentry_dir)
3720
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
3721
CEPH_CAP_PIN);
3722
3723
if (req->r_inode) {
3724
err = ceph_wait_on_async_create(req->r_inode);
3725
if (err) {
3726
doutc(cl, "wait for async create returned: %d\n", err);
3727
return err;
3728
}
3729
}
3730
3731
if (!err && req->r_old_inode) {
3732
err = ceph_wait_on_async_create(req->r_old_inode);
3733
if (err) {
3734
doutc(cl, "wait for async create returned: %d\n", err);
3735
return err;
3736
}
3737
}
3738
3739
doutc(cl, "submit_request on %p for inode %p\n", req, dir);
3740
mutex_lock(&mdsc->mutex);
3741
__register_request(mdsc, req, dir);
3742
trace_ceph_mdsc_submit_request(mdsc, req);
3743
__do_request(mdsc, req);
3744
err = req->r_err;
3745
mutex_unlock(&mdsc->mutex);
3746
return err;
3747
}
3748
3749
int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3750
struct ceph_mds_request *req,
3751
ceph_mds_request_wait_callback_t wait_func)
3752
{
3753
struct ceph_client *cl = mdsc->fsc->client;
3754
int err;
3755
3756
/* wait */
3757
doutc(cl, "do_request waiting\n");
3758
if (wait_func) {
3759
err = wait_func(mdsc, req);
3760
} else {
3761
long timeleft = wait_for_completion_killable_timeout(
3762
&req->r_completion,
3763
ceph_timeout_jiffies(req->r_timeout));
3764
if (timeleft > 0)
3765
err = 0;
3766
else if (!timeleft)
3767
err = -ETIMEDOUT; /* timed out */
3768
else
3769
err = timeleft; /* killed */
3770
}
3771
doutc(cl, "do_request waited, got %d\n", err);
3772
mutex_lock(&mdsc->mutex);
3773
3774
/* only abort if we didn't race with a real reply */
3775
if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3776
err = le32_to_cpu(req->r_reply_info.head->result);
3777
} else if (err < 0) {
3778
doutc(cl, "aborted request %lld with %d\n", req->r_tid, err);
3779
3780
/*
3781
* ensure we aren't running concurrently with
3782
* ceph_fill_trace or ceph_readdir_prepopulate, which
3783
* rely on locks (dir mutex) held by our caller.
3784
*/
3785
mutex_lock(&req->r_fill_mutex);
3786
req->r_err = err;
3787
set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3788
mutex_unlock(&req->r_fill_mutex);
3789
3790
if (req->r_parent &&
3791
(req->r_op & CEPH_MDS_OP_WRITE))
3792
ceph_invalidate_dir_request(req);
3793
} else {
3794
err = req->r_err;
3795
}
3796
3797
mutex_unlock(&mdsc->mutex);
3798
return err;
3799
}
3800
3801
/*
3802
* Synchrously perform an mds request. Take care of all of the
3803
* session setup, forwarding, retry details.
3804
*/
3805
int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3806
struct inode *dir,
3807
struct ceph_mds_request *req)
3808
{
3809
struct ceph_client *cl = mdsc->fsc->client;
3810
int err;
3811
3812
doutc(cl, "do_request on %p\n", req);
3813
3814
/* issue */
3815
err = ceph_mdsc_submit_request(mdsc, dir, req);
3816
if (!err)
3817
err = ceph_mdsc_wait_request(mdsc, req, NULL);
3818
doutc(cl, "do_request %p done, result %d\n", req, err);
3819
return err;
3820
}
3821
3822
/*
3823
* Invalidate dir's completeness, dentry lease state on an aborted MDS
3824
* namespace request.
3825
*/
3826
void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3827
{
3828
struct inode *dir = req->r_parent;
3829
struct inode *old_dir = req->r_old_dentry_dir;
3830
struct ceph_client *cl = req->r_mdsc->fsc->client;
3831
3832
doutc(cl, "invalidate_dir_request %p %p (complete, lease(s))\n",
3833
dir, old_dir);
3834
3835
ceph_dir_clear_complete(dir);
3836
if (old_dir)
3837
ceph_dir_clear_complete(old_dir);
3838
if (req->r_dentry)
3839
ceph_invalidate_dentry_lease(req->r_dentry);
3840
if (req->r_old_dentry)
3841
ceph_invalidate_dentry_lease(req->r_old_dentry);
3842
}
3843
3844
/*
3845
* Handle mds reply.
3846
*
3847
* We take the session mutex and parse and process the reply immediately.
3848
* This preserves the logical ordering of replies, capabilities, etc., sent
3849
* by the MDS as they are applied to our local cache.
3850
*/
3851
static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3852
{
3853
struct ceph_mds_client *mdsc = session->s_mdsc;
3854
struct ceph_client *cl = mdsc->fsc->client;
3855
struct ceph_mds_request *req;
3856
struct ceph_mds_reply_head *head = msg->front.iov_base;
3857
struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
3858
struct ceph_snap_realm *realm;
3859
u64 tid;
3860
int err, result;
3861
int mds = session->s_mds;
3862
bool close_sessions = false;
3863
3864
if (msg->front.iov_len < sizeof(*head)) {
3865
pr_err_client(cl, "got corrupt (short) reply\n");
3866
ceph_msg_dump(msg);
3867
return;
3868
}
3869
3870
/* get request, session */
3871
tid = le64_to_cpu(msg->hdr.tid);
3872
mutex_lock(&mdsc->mutex);
3873
req = lookup_get_request(mdsc, tid);
3874
if (!req) {
3875
doutc(cl, "on unknown tid %llu\n", tid);
3876
mutex_unlock(&mdsc->mutex);
3877
return;
3878
}
3879
doutc(cl, "handle_reply %p\n", req);
3880
3881
/* correct session? */
3882
if (req->r_session != session) {
3883
pr_err_client(cl, "got %llu on session mds%d not mds%d\n",
3884
tid, session->s_mds,
3885
req->r_session ? req->r_session->s_mds : -1);
3886
mutex_unlock(&mdsc->mutex);
3887
goto out;
3888
}
3889
3890
/* dup? */
3891
if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3892
(test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3893
pr_warn_client(cl, "got a dup %s reply on %llu from mds%d\n",
3894
head->safe ? "safe" : "unsafe", tid, mds);
3895
mutex_unlock(&mdsc->mutex);
3896
goto out;
3897
}
3898
if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3899
pr_warn_client(cl, "got unsafe after safe on %llu from mds%d\n",
3900
tid, mds);
3901
mutex_unlock(&mdsc->mutex);
3902
goto out;
3903
}
3904
3905
result = le32_to_cpu(head->result);
3906
3907
if (head->safe) {
3908
set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3909
__unregister_request(mdsc, req);
3910
3911
/* last request during umount? */
3912
if (mdsc->stopping && !__get_oldest_req(mdsc))
3913
complete_all(&mdsc->safe_umount_waiters);
3914
3915
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3916
/*
3917
* We already handled the unsafe response, now do the
3918
* cleanup. No need to examine the response; the MDS
3919
* doesn't include any result info in the safe
3920
* response. And even if it did, there is nothing
3921
* useful we could do with a revised return value.
3922
*/
3923
doutc(cl, "got safe reply %llu, mds%d\n", tid, mds);
3924
3925
mutex_unlock(&mdsc->mutex);
3926
goto out;
3927
}
3928
} else {
3929
set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3930
list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3931
}
3932
3933
doutc(cl, "tid %lld result %d\n", tid, result);
3934
if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3935
err = parse_reply_info(session, msg, req, (u64)-1);
3936
else
3937
err = parse_reply_info(session, msg, req,
3938
session->s_con.peer_features);
3939
mutex_unlock(&mdsc->mutex);
3940
3941
/* Must find target inode outside of mutexes to avoid deadlocks */
3942
rinfo = &req->r_reply_info;
3943
if ((err >= 0) && rinfo->head->is_target) {
3944
struct inode *in = xchg(&req->r_new_inode, NULL);
3945
struct ceph_vino tvino = {
3946
.ino = le64_to_cpu(rinfo->targeti.in->ino),
3947
.snap = le64_to_cpu(rinfo->targeti.in->snapid)
3948
};
3949
3950
/*
3951
* If we ended up opening an existing inode, discard
3952
* r_new_inode
3953
*/
3954
if (req->r_op == CEPH_MDS_OP_CREATE &&
3955
!req->r_reply_info.has_create_ino) {
3956
/* This should never happen on an async create */
3957
WARN_ON_ONCE(req->r_deleg_ino);
3958
iput(in);
3959
in = NULL;
3960
}
3961
3962
in = ceph_get_inode(mdsc->fsc->sb, tvino, in);
3963
if (IS_ERR(in)) {
3964
err = PTR_ERR(in);
3965
mutex_lock(&session->s_mutex);
3966
goto out_err;
3967
}
3968
req->r_target_inode = in;
3969
}
3970
3971
mutex_lock(&session->s_mutex);
3972
if (err < 0) {
3973
pr_err_client(cl, "got corrupt reply mds%d(tid:%lld)\n",
3974
mds, tid);
3975
ceph_msg_dump(msg);
3976
goto out_err;
3977
}
3978
3979
/* snap trace */
3980
realm = NULL;
3981
if (rinfo->snapblob_len) {
3982
down_write(&mdsc->snap_rwsem);
3983
err = ceph_update_snap_trace(mdsc, rinfo->snapblob,
3984
rinfo->snapblob + rinfo->snapblob_len,
3985
le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3986
&realm);
3987
if (err) {
3988
up_write(&mdsc->snap_rwsem);
3989
close_sessions = true;
3990
if (err == -EIO)
3991
ceph_msg_dump(msg);
3992
goto out_err;
3993
}
3994
downgrade_write(&mdsc->snap_rwsem);
3995
} else {
3996
down_read(&mdsc->snap_rwsem);
3997
}
3998
3999
/* insert trace into our cache */
4000
mutex_lock(&req->r_fill_mutex);
4001
current->journal_info = req;
4002
err = ceph_fill_trace(mdsc->fsc->sb, req);
4003
if (err == 0) {
4004
if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
4005
req->r_op == CEPH_MDS_OP_LSSNAP))
4006
err = ceph_readdir_prepopulate(req, req->r_session);
4007
}
4008
current->journal_info = NULL;
4009
mutex_unlock(&req->r_fill_mutex);
4010
4011
up_read(&mdsc->snap_rwsem);
4012
if (realm)
4013
ceph_put_snap_realm(mdsc, realm);
4014
4015
if (err == 0) {
4016
if (req->r_target_inode &&
4017
test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
4018
struct ceph_inode_info *ci =
4019
ceph_inode(req->r_target_inode);
4020
spin_lock(&ci->i_unsafe_lock);
4021
list_add_tail(&req->r_unsafe_target_item,
4022
&ci->i_unsafe_iops);
4023
spin_unlock(&ci->i_unsafe_lock);
4024
}
4025
4026
ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
4027
}
4028
out_err:
4029
mutex_lock(&mdsc->mutex);
4030
if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
4031
if (err) {
4032
req->r_err = err;
4033
} else {
4034
req->r_reply = ceph_msg_get(msg);
4035
set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
4036
}
4037
} else {
4038
doutc(cl, "reply arrived after request %lld was aborted\n", tid);
4039
}
4040
mutex_unlock(&mdsc->mutex);
4041
4042
mutex_unlock(&session->s_mutex);
4043
4044
/* kick calling process */
4045
complete_request(mdsc, req);
4046
4047
ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
4048
req->r_end_latency, err);
4049
out:
4050
ceph_mdsc_put_request(req);
4051
4052
/* Defer closing the sessions after s_mutex lock being released */
4053
if (close_sessions)
4054
ceph_mdsc_close_sessions(mdsc);
4055
return;
4056
}
4057
4058
4059
4060
/*
4061
* handle mds notification that our request has been forwarded.
4062
*/
4063
static void handle_forward(struct ceph_mds_client *mdsc,
4064
struct ceph_mds_session *session,
4065
struct ceph_msg *msg)
4066
{
4067
struct ceph_client *cl = mdsc->fsc->client;
4068
struct ceph_mds_request *req;
4069
u64 tid = le64_to_cpu(msg->hdr.tid);
4070
u32 next_mds;
4071
u32 fwd_seq;
4072
int err = -EINVAL;
4073
void *p = msg->front.iov_base;
4074
void *end = p + msg->front.iov_len;
4075
bool aborted = false;
4076
4077
ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4078
next_mds = ceph_decode_32(&p);
4079
fwd_seq = ceph_decode_32(&p);
4080
4081
mutex_lock(&mdsc->mutex);
4082
req = lookup_get_request(mdsc, tid);
4083
if (!req) {
4084
mutex_unlock(&mdsc->mutex);
4085
doutc(cl, "forward tid %llu to mds%d - req dne\n", tid, next_mds);
4086
return; /* dup reply? */
4087
}
4088
4089
if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
4090
doutc(cl, "forward tid %llu aborted, unregistering\n", tid);
4091
__unregister_request(mdsc, req);
4092
} else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) {
4093
/*
4094
* Avoid infinite retrying after overflow.
4095
*
4096
* The MDS will increase the fwd count and in client side
4097
* if the num_fwd is less than the one saved in request
4098
* that means the MDS is an old version and overflowed of
4099
* 8 bits.
4100
*/
4101
mutex_lock(&req->r_fill_mutex);
4102
req->r_err = -EMULTIHOP;
4103
set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
4104
mutex_unlock(&req->r_fill_mutex);
4105
aborted = true;
4106
pr_warn_ratelimited_client(cl, "forward tid %llu seq overflow\n",
4107
tid);
4108
} else {
4109
/* resend. forward race not possible; mds would drop */
4110
doutc(cl, "forward tid %llu to mds%d (we resend)\n", tid, next_mds);
4111
BUG_ON(req->r_err);
4112
BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
4113
req->r_attempts = 0;
4114
req->r_num_fwd = fwd_seq;
4115
req->r_resend_mds = next_mds;
4116
put_request_session(req);
4117
__do_request(mdsc, req);
4118
}
4119
mutex_unlock(&mdsc->mutex);
4120
4121
/* kick calling process */
4122
if (aborted)
4123
complete_request(mdsc, req);
4124
ceph_mdsc_put_request(req);
4125
return;
4126
4127
bad:
4128
pr_err_client(cl, "decode error err=%d\n", err);
4129
ceph_msg_dump(msg);
4130
}
4131
4132
static int __decode_session_metadata(void **p, void *end,
4133
bool *blocklisted)
4134
{
4135
/* map<string,string> */
4136
u32 n;
4137
bool err_str;
4138
ceph_decode_32_safe(p, end, n, bad);
4139
while (n-- > 0) {
4140
u32 len;
4141
ceph_decode_32_safe(p, end, len, bad);
4142
ceph_decode_need(p, end, len, bad);
4143
err_str = !strncmp(*p, "error_string", len);
4144
*p += len;
4145
ceph_decode_32_safe(p, end, len, bad);
4146
ceph_decode_need(p, end, len, bad);
4147
/*
4148
* Match "blocklisted (blacklisted)" from newer MDSes,
4149
* or "blacklisted" from older MDSes.
4150
*/
4151
if (err_str && strnstr(*p, "blacklisted", len))
4152
*blocklisted = true;
4153
*p += len;
4154
}
4155
return 0;
4156
bad:
4157
return -1;
4158
}
4159
4160
/*
4161
* handle a mds session control message
4162
*/
4163
static void handle_session(struct ceph_mds_session *session,
4164
struct ceph_msg *msg)
4165
{
4166
struct ceph_mds_client *mdsc = session->s_mdsc;
4167
struct ceph_client *cl = mdsc->fsc->client;
4168
int mds = session->s_mds;
4169
int msg_version = le16_to_cpu(msg->hdr.version);
4170
void *p = msg->front.iov_base;
4171
void *end = p + msg->front.iov_len;
4172
struct ceph_mds_session_head *h;
4173
struct ceph_mds_cap_auth *cap_auths = NULL;
4174
u32 op, cap_auths_num = 0;
4175
u64 seq, features = 0;
4176
int wake = 0;
4177
bool blocklisted = false;
4178
u32 i;
4179
4180
4181
/* decode */
4182
ceph_decode_need(&p, end, sizeof(*h), bad);
4183
h = p;
4184
p += sizeof(*h);
4185
4186
op = le32_to_cpu(h->op);
4187
seq = le64_to_cpu(h->seq);
4188
4189
if (msg_version >= 3) {
4190
u32 len;
4191
/* version >= 2 and < 5, decode metadata, skip otherwise
4192
* as it's handled via flags.
4193
*/
4194
if (msg_version >= 5)
4195
ceph_decode_skip_map(&p, end, string, string, bad);
4196
else if (__decode_session_metadata(&p, end, &blocklisted) < 0)
4197
goto bad;
4198
4199
/* version >= 3, feature bits */
4200
ceph_decode_32_safe(&p, end, len, bad);
4201
if (len) {
4202
ceph_decode_64_safe(&p, end, features, bad);
4203
p += len - sizeof(features);
4204
}
4205
}
4206
4207
if (msg_version >= 5) {
4208
u32 flags, len;
4209
4210
/* version >= 4 */
4211
ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */
4212
ceph_decode_32_safe(&p, end, len, bad); /* len */
4213
ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */
4214
4215
/* version >= 5, flags */
4216
ceph_decode_32_safe(&p, end, flags, bad);
4217
if (flags & CEPH_SESSION_BLOCKLISTED) {
4218
pr_warn_client(cl, "mds%d session blocklisted\n",
4219
session->s_mds);
4220
blocklisted = true;
4221
}
4222
}
4223
4224
if (msg_version >= 6) {
4225
ceph_decode_32_safe(&p, end, cap_auths_num, bad);
4226
doutc(cl, "cap_auths_num %d\n", cap_auths_num);
4227
4228
if (cap_auths_num && op != CEPH_SESSION_OPEN) {
4229
WARN_ON_ONCE(op != CEPH_SESSION_OPEN);
4230
goto skip_cap_auths;
4231
}
4232
4233
cap_auths = kcalloc(cap_auths_num,
4234
sizeof(struct ceph_mds_cap_auth),
4235
GFP_KERNEL);
4236
if (!cap_auths) {
4237
pr_err_client(cl, "No memory for cap_auths\n");
4238
return;
4239
}
4240
4241
for (i = 0; i < cap_auths_num; i++) {
4242
u32 _len, j;
4243
4244
/* struct_v, struct_compat, and struct_len in MDSCapAuth */
4245
ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad);
4246
4247
/* struct_v, struct_compat, and struct_len in MDSCapMatch */
4248
ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad);
4249
ceph_decode_64_safe(&p, end, cap_auths[i].match.uid, bad);
4250
ceph_decode_32_safe(&p, end, _len, bad);
4251
if (_len) {
4252
cap_auths[i].match.gids = kcalloc(_len, sizeof(u32),
4253
GFP_KERNEL);
4254
if (!cap_auths[i].match.gids) {
4255
pr_err_client(cl, "No memory for gids\n");
4256
goto fail;
4257
}
4258
4259
cap_auths[i].match.num_gids = _len;
4260
for (j = 0; j < _len; j++)
4261
ceph_decode_32_safe(&p, end,
4262
cap_auths[i].match.gids[j],
4263
bad);
4264
}
4265
4266
ceph_decode_32_safe(&p, end, _len, bad);
4267
if (_len) {
4268
cap_auths[i].match.path = kcalloc(_len + 1, sizeof(char),
4269
GFP_KERNEL);
4270
if (!cap_auths[i].match.path) {
4271
pr_err_client(cl, "No memory for path\n");
4272
goto fail;
4273
}
4274
ceph_decode_copy(&p, cap_auths[i].match.path, _len);
4275
4276
/* Remove the tailing '/' */
4277
while (_len && cap_auths[i].match.path[_len - 1] == '/') {
4278
cap_auths[i].match.path[_len - 1] = '\0';
4279
_len -= 1;
4280
}
4281
}
4282
4283
ceph_decode_32_safe(&p, end, _len, bad);
4284
if (_len) {
4285
cap_auths[i].match.fs_name = kcalloc(_len + 1, sizeof(char),
4286
GFP_KERNEL);
4287
if (!cap_auths[i].match.fs_name) {
4288
pr_err_client(cl, "No memory for fs_name\n");
4289
goto fail;
4290
}
4291
ceph_decode_copy(&p, cap_auths[i].match.fs_name, _len);
4292
}
4293
4294
ceph_decode_8_safe(&p, end, cap_auths[i].match.root_squash, bad);
4295
ceph_decode_8_safe(&p, end, cap_auths[i].readable, bad);
4296
ceph_decode_8_safe(&p, end, cap_auths[i].writeable, bad);
4297
doutc(cl, "uid %lld, num_gids %u, path %s, fs_name %s, root_squash %d, readable %d, writeable %d\n",
4298
cap_auths[i].match.uid, cap_auths[i].match.num_gids,
4299
cap_auths[i].match.path, cap_auths[i].match.fs_name,
4300
cap_auths[i].match.root_squash,
4301
cap_auths[i].readable, cap_auths[i].writeable);
4302
}
4303
}
4304
4305
skip_cap_auths:
4306
mutex_lock(&mdsc->mutex);
4307
if (op == CEPH_SESSION_OPEN) {
4308
if (mdsc->s_cap_auths) {
4309
for (i = 0; i < mdsc->s_cap_auths_num; i++) {
4310
kfree(mdsc->s_cap_auths[i].match.gids);
4311
kfree(mdsc->s_cap_auths[i].match.path);
4312
kfree(mdsc->s_cap_auths[i].match.fs_name);
4313
}
4314
kfree(mdsc->s_cap_auths);
4315
}
4316
mdsc->s_cap_auths_num = cap_auths_num;
4317
mdsc->s_cap_auths = cap_auths;
4318
}
4319
if (op == CEPH_SESSION_CLOSE) {
4320
ceph_get_mds_session(session);
4321
__unregister_session(mdsc, session);
4322
}
4323
/* FIXME: this ttl calculation is generous */
4324
session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
4325
mutex_unlock(&mdsc->mutex);
4326
4327
mutex_lock(&session->s_mutex);
4328
4329
doutc(cl, "mds%d %s %p state %s seq %llu\n", mds,
4330
ceph_session_op_name(op), session,
4331
ceph_session_state_name(session->s_state), seq);
4332
4333
if (session->s_state == CEPH_MDS_SESSION_HUNG) {
4334
session->s_state = CEPH_MDS_SESSION_OPEN;
4335
pr_info_client(cl, "mds%d came back\n", session->s_mds);
4336
}
4337
4338
switch (op) {
4339
case CEPH_SESSION_OPEN:
4340
if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
4341
pr_info_client(cl, "mds%d reconnect success\n",
4342
session->s_mds);
4343
4344
session->s_features = features;
4345
if (session->s_state == CEPH_MDS_SESSION_OPEN) {
4346
pr_notice_client(cl, "mds%d is already opened\n",
4347
session->s_mds);
4348
} else {
4349
session->s_state = CEPH_MDS_SESSION_OPEN;
4350
renewed_caps(mdsc, session, 0);
4351
if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT,
4352
&session->s_features))
4353
metric_schedule_delayed(&mdsc->metric);
4354
}
4355
4356
/*
4357
* The connection maybe broken and the session in client
4358
* side has been reinitialized, need to update the seq
4359
* anyway.
4360
*/
4361
if (!session->s_seq && seq)
4362
session->s_seq = seq;
4363
4364
wake = 1;
4365
if (mdsc->stopping)
4366
__close_session(mdsc, session);
4367
break;
4368
4369
case CEPH_SESSION_RENEWCAPS:
4370
if (session->s_renew_seq == seq)
4371
renewed_caps(mdsc, session, 1);
4372
break;
4373
4374
case CEPH_SESSION_CLOSE:
4375
if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
4376
pr_info_client(cl, "mds%d reconnect denied\n",
4377
session->s_mds);
4378
session->s_state = CEPH_MDS_SESSION_CLOSED;
4379
cleanup_session_requests(mdsc, session);
4380
remove_session_caps(session);
4381
wake = 2; /* for good measure */
4382
wake_up_all(&mdsc->session_close_wq);
4383
break;
4384
4385
case CEPH_SESSION_STALE:
4386
pr_info_client(cl, "mds%d caps went stale, renewing\n",
4387
session->s_mds);
4388
atomic_inc(&session->s_cap_gen);
4389
session->s_cap_ttl = jiffies - 1;
4390
send_renew_caps(mdsc, session);
4391
break;
4392
4393
case CEPH_SESSION_RECALL_STATE:
4394
ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
4395
break;
4396
4397
case CEPH_SESSION_FLUSHMSG:
4398
/* flush cap releases */
4399
spin_lock(&session->s_cap_lock);
4400
if (session->s_num_cap_releases)
4401
ceph_flush_session_cap_releases(mdsc, session);
4402
spin_unlock(&session->s_cap_lock);
4403
4404
send_flushmsg_ack(mdsc, session, seq);
4405
break;
4406
4407
case CEPH_SESSION_FORCE_RO:
4408
doutc(cl, "force_session_readonly %p\n", session);
4409
spin_lock(&session->s_cap_lock);
4410
session->s_readonly = true;
4411
spin_unlock(&session->s_cap_lock);
4412
wake_up_session_caps(session, FORCE_RO);
4413
break;
4414
4415
case CEPH_SESSION_REJECT:
4416
WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
4417
pr_info_client(cl, "mds%d rejected session\n",
4418
session->s_mds);
4419
session->s_state = CEPH_MDS_SESSION_REJECTED;
4420
cleanup_session_requests(mdsc, session);
4421
remove_session_caps(session);
4422
if (blocklisted)
4423
mdsc->fsc->blocklisted = true;
4424
wake = 2; /* for good measure */
4425
break;
4426
4427
default:
4428
pr_err_client(cl, "bad op %d mds%d\n", op, mds);
4429
WARN_ON(1);
4430
}
4431
4432
mutex_unlock(&session->s_mutex);
4433
if (wake) {
4434
mutex_lock(&mdsc->mutex);
4435
__wake_requests(mdsc, &session->s_waiting);
4436
if (wake == 2)
4437
kick_requests(mdsc, mds);
4438
mutex_unlock(&mdsc->mutex);
4439
}
4440
if (op == CEPH_SESSION_CLOSE)
4441
ceph_put_mds_session(session);
4442
return;
4443
4444
bad:
4445
pr_err_client(cl, "corrupt message mds%d len %d\n", mds,
4446
(int)msg->front.iov_len);
4447
ceph_msg_dump(msg);
4448
fail:
4449
for (i = 0; i < cap_auths_num; i++) {
4450
kfree(cap_auths[i].match.gids);
4451
kfree(cap_auths[i].match.path);
4452
kfree(cap_auths[i].match.fs_name);
4453
}
4454
kfree(cap_auths);
4455
return;
4456
}
4457
4458
void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
4459
{
4460
struct ceph_client *cl = req->r_mdsc->fsc->client;
4461
int dcaps;
4462
4463
dcaps = xchg(&req->r_dir_caps, 0);
4464
if (dcaps) {
4465
doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
4466
ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
4467
}
4468
}
4469
4470
void ceph_mdsc_release_dir_caps_async(struct ceph_mds_request *req)
4471
{
4472
struct ceph_client *cl = req->r_mdsc->fsc->client;
4473
int dcaps;
4474
4475
dcaps = xchg(&req->r_dir_caps, 0);
4476
if (dcaps) {
4477
doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
4478
ceph_put_cap_refs_async(ceph_inode(req->r_parent), dcaps);
4479
}
4480
}
4481
4482
/*
4483
* called under session->mutex.
4484
*/
4485
static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
4486
struct ceph_mds_session *session)
4487
{
4488
struct ceph_mds_request *req, *nreq;
4489
struct rb_node *p;
4490
4491
doutc(mdsc->fsc->client, "mds%d\n", session->s_mds);
4492
4493
mutex_lock(&mdsc->mutex);
4494
list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
4495
__send_request(session, req, true);
4496
4497
/*
4498
* also re-send old requests when MDS enters reconnect stage. So that MDS
4499
* can process completed request in clientreplay stage.
4500
*/
4501
p = rb_first(&mdsc->request_tree);
4502
while (p) {
4503
req = rb_entry(p, struct ceph_mds_request, r_node);
4504
p = rb_next(p);
4505
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
4506
continue;
4507
if (req->r_attempts == 0)
4508
continue; /* only old requests */
4509
if (!req->r_session)
4510
continue;
4511
if (req->r_session->s_mds != session->s_mds)
4512
continue;
4513
4514
ceph_mdsc_release_dir_caps_async(req);
4515
4516
__send_request(session, req, true);
4517
}
4518
mutex_unlock(&mdsc->mutex);
4519
}
4520
4521
static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
4522
{
4523
struct ceph_msg *reply;
4524
struct ceph_pagelist *_pagelist;
4525
struct page *page;
4526
__le32 *addr;
4527
int err = -ENOMEM;
4528
4529
if (!recon_state->allow_multi)
4530
return -ENOSPC;
4531
4532
/* can't handle message that contains both caps and realm */
4533
BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
4534
4535
/* pre-allocate new pagelist */
4536
_pagelist = ceph_pagelist_alloc(GFP_NOFS);
4537
if (!_pagelist)
4538
return -ENOMEM;
4539
4540
reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4541
if (!reply)
4542
goto fail_msg;
4543
4544
/* placeholder for nr_caps */
4545
err = ceph_pagelist_encode_32(_pagelist, 0);
4546
if (err < 0)
4547
goto fail;
4548
4549
if (recon_state->nr_caps) {
4550
/* currently encoding caps */
4551
err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
4552
if (err)
4553
goto fail;
4554
} else {
4555
/* placeholder for nr_realms (currently encoding relams) */
4556
err = ceph_pagelist_encode_32(_pagelist, 0);
4557
if (err < 0)
4558
goto fail;
4559
}
4560
4561
err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
4562
if (err)
4563
goto fail;
4564
4565
page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
4566
addr = kmap_atomic(page);
4567
if (recon_state->nr_caps) {
4568
/* currently encoding caps */
4569
*addr = cpu_to_le32(recon_state->nr_caps);
4570
} else {
4571
/* currently encoding relams */
4572
*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
4573
}
4574
kunmap_atomic(addr);
4575
4576
reply->hdr.version = cpu_to_le16(5);
4577
reply->hdr.compat_version = cpu_to_le16(4);
4578
4579
reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
4580
ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
4581
4582
ceph_con_send(&recon_state->session->s_con, reply);
4583
ceph_pagelist_release(recon_state->pagelist);
4584
4585
recon_state->pagelist = _pagelist;
4586
recon_state->nr_caps = 0;
4587
recon_state->nr_realms = 0;
4588
recon_state->msg_version = 5;
4589
return 0;
4590
fail:
4591
ceph_msg_put(reply);
4592
fail_msg:
4593
ceph_pagelist_release(_pagelist);
4594
return err;
4595
}
4596
4597
static struct dentry* d_find_primary(struct inode *inode)
4598
{
4599
struct dentry *alias, *dn = NULL;
4600
4601
if (hlist_empty(&inode->i_dentry))
4602
return NULL;
4603
4604
spin_lock(&inode->i_lock);
4605
if (hlist_empty(&inode->i_dentry))
4606
goto out_unlock;
4607
4608
if (S_ISDIR(inode->i_mode)) {
4609
alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
4610
if (!IS_ROOT(alias))
4611
dn = dget(alias);
4612
goto out_unlock;
4613
}
4614
4615
hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
4616
spin_lock(&alias->d_lock);
4617
if (!d_unhashed(alias) &&
4618
(ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
4619
dn = dget_dlock(alias);
4620
}
4621
spin_unlock(&alias->d_lock);
4622
if (dn)
4623
break;
4624
}
4625
out_unlock:
4626
spin_unlock(&inode->i_lock);
4627
return dn;
4628
}
4629
4630
/*
4631
* Encode information about a cap for a reconnect with the MDS.
4632
*/
4633
static int reconnect_caps_cb(struct inode *inode, int mds, void *arg)
4634
{
4635
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
4636
struct ceph_client *cl = ceph_inode_to_client(inode);
4637
union {
4638
struct ceph_mds_cap_reconnect v2;
4639
struct ceph_mds_cap_reconnect_v1 v1;
4640
} rec;
4641
struct ceph_inode_info *ci = ceph_inode(inode);
4642
struct ceph_reconnect_state *recon_state = arg;
4643
struct ceph_pagelist *pagelist = recon_state->pagelist;
4644
struct dentry *dentry;
4645
struct ceph_cap *cap;
4646
struct ceph_path_info path_info = {0};
4647
int err;
4648
u64 snap_follows;
4649
4650
dentry = d_find_primary(inode);
4651
if (dentry) {
4652
/* set pathbase to parent dir when msg_version >= 2 */
4653
char *path = ceph_mdsc_build_path(mdsc, dentry, &path_info,
4654
recon_state->msg_version >= 2);
4655
dput(dentry);
4656
if (IS_ERR(path)) {
4657
err = PTR_ERR(path);
4658
goto out_err;
4659
}
4660
}
4661
4662
spin_lock(&ci->i_ceph_lock);
4663
cap = __get_cap_for_mds(ci, mds);
4664
if (!cap) {
4665
spin_unlock(&ci->i_ceph_lock);
4666
err = 0;
4667
goto out_err;
4668
}
4669
doutc(cl, " adding %p ino %llx.%llx cap %p %lld %s\n", inode,
4670
ceph_vinop(inode), cap, cap->cap_id,
4671
ceph_cap_string(cap->issued));
4672
4673
cap->seq = 0; /* reset cap seq */
4674
cap->issue_seq = 0; /* and issue_seq */
4675
cap->mseq = 0; /* and migrate_seq */
4676
cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
4677
4678
/* These are lost when the session goes away */
4679
if (S_ISDIR(inode->i_mode)) {
4680
if (cap->issued & CEPH_CAP_DIR_CREATE) {
4681
ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
4682
memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
4683
}
4684
cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
4685
}
4686
4687
if (recon_state->msg_version >= 2) {
4688
rec.v2.cap_id = cpu_to_le64(cap->cap_id);
4689
rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
4690
rec.v2.issued = cpu_to_le32(cap->issued);
4691
rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
4692
rec.v2.pathbase = cpu_to_le64(path_info.vino.ino);
4693
rec.v2.flock_len = (__force __le32)
4694
((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
4695
} else {
4696
struct timespec64 ts;
4697
4698
rec.v1.cap_id = cpu_to_le64(cap->cap_id);
4699
rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
4700
rec.v1.issued = cpu_to_le32(cap->issued);
4701
rec.v1.size = cpu_to_le64(i_size_read(inode));
4702
ts = inode_get_mtime(inode);
4703
ceph_encode_timespec64(&rec.v1.mtime, &ts);
4704
ts = inode_get_atime(inode);
4705
ceph_encode_timespec64(&rec.v1.atime, &ts);
4706
rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
4707
rec.v1.pathbase = cpu_to_le64(path_info.vino.ino);
4708
}
4709
4710
if (list_empty(&ci->i_cap_snaps)) {
4711
snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
4712
} else {
4713
struct ceph_cap_snap *capsnap =
4714
list_first_entry(&ci->i_cap_snaps,
4715
struct ceph_cap_snap, ci_item);
4716
snap_follows = capsnap->follows;
4717
}
4718
spin_unlock(&ci->i_ceph_lock);
4719
4720
if (recon_state->msg_version >= 2) {
4721
int num_fcntl_locks, num_flock_locks;
4722
struct ceph_filelock *flocks = NULL;
4723
size_t struct_len, total_len = sizeof(u64);
4724
u8 struct_v = 0;
4725
4726
encode_again:
4727
if (rec.v2.flock_len) {
4728
ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
4729
} else {
4730
num_fcntl_locks = 0;
4731
num_flock_locks = 0;
4732
}
4733
if (num_fcntl_locks + num_flock_locks > 0) {
4734
flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
4735
sizeof(struct ceph_filelock),
4736
GFP_NOFS);
4737
if (!flocks) {
4738
err = -ENOMEM;
4739
goto out_err;
4740
}
4741
err = ceph_encode_locks_to_buffer(inode, flocks,
4742
num_fcntl_locks,
4743
num_flock_locks);
4744
if (err) {
4745
kfree(flocks);
4746
flocks = NULL;
4747
if (err == -ENOSPC)
4748
goto encode_again;
4749
goto out_err;
4750
}
4751
} else {
4752
kfree(flocks);
4753
flocks = NULL;
4754
}
4755
4756
if (recon_state->msg_version >= 3) {
4757
/* version, compat_version and struct_len */
4758
total_len += 2 * sizeof(u8) + sizeof(u32);
4759
struct_v = 2;
4760
}
4761
/*
4762
* number of encoded locks is stable, so copy to pagelist
4763
*/
4764
struct_len = 2 * sizeof(u32) +
4765
(num_fcntl_locks + num_flock_locks) *
4766
sizeof(struct ceph_filelock);
4767
rec.v2.flock_len = cpu_to_le32(struct_len);
4768
4769
struct_len += sizeof(u32) + path_info.pathlen + sizeof(rec.v2);
4770
4771
if (struct_v >= 2)
4772
struct_len += sizeof(u64); /* snap_follows */
4773
4774
total_len += struct_len;
4775
4776
if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
4777
err = send_reconnect_partial(recon_state);
4778
if (err)
4779
goto out_freeflocks;
4780
pagelist = recon_state->pagelist;
4781
}
4782
4783
err = ceph_pagelist_reserve(pagelist, total_len);
4784
if (err)
4785
goto out_freeflocks;
4786
4787
ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4788
if (recon_state->msg_version >= 3) {
4789
ceph_pagelist_encode_8(pagelist, struct_v);
4790
ceph_pagelist_encode_8(pagelist, 1);
4791
ceph_pagelist_encode_32(pagelist, struct_len);
4792
}
4793
ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen);
4794
ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
4795
ceph_locks_to_pagelist(flocks, pagelist,
4796
num_fcntl_locks, num_flock_locks);
4797
if (struct_v >= 2)
4798
ceph_pagelist_encode_64(pagelist, snap_follows);
4799
out_freeflocks:
4800
kfree(flocks);
4801
} else {
4802
err = ceph_pagelist_reserve(pagelist,
4803
sizeof(u64) + sizeof(u32) +
4804
path_info.pathlen + sizeof(rec.v1));
4805
if (err)
4806
goto out_err;
4807
4808
ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4809
ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen);
4810
ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
4811
}
4812
4813
out_err:
4814
ceph_mdsc_free_path_info(&path_info);
4815
if (!err)
4816
recon_state->nr_caps++;
4817
return err;
4818
}
4819
4820
static int encode_snap_realms(struct ceph_mds_client *mdsc,
4821
struct ceph_reconnect_state *recon_state)
4822
{
4823
struct rb_node *p;
4824
struct ceph_pagelist *pagelist = recon_state->pagelist;
4825
struct ceph_client *cl = mdsc->fsc->client;
4826
int err = 0;
4827
4828
if (recon_state->msg_version >= 4) {
4829
err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
4830
if (err < 0)
4831
goto fail;
4832
}
4833
4834
/*
4835
* snaprealms. we provide mds with the ino, seq (version), and
4836
* parent for all of our realms. If the mds has any newer info,
4837
* it will tell us.
4838
*/
4839
for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
4840
struct ceph_snap_realm *realm =
4841
rb_entry(p, struct ceph_snap_realm, node);
4842
struct ceph_mds_snaprealm_reconnect sr_rec;
4843
4844
if (recon_state->msg_version >= 4) {
4845
size_t need = sizeof(u8) * 2 + sizeof(u32) +
4846
sizeof(sr_rec);
4847
4848
if (pagelist->length + need > RECONNECT_MAX_SIZE) {
4849
err = send_reconnect_partial(recon_state);
4850
if (err)
4851
goto fail;
4852
pagelist = recon_state->pagelist;
4853
}
4854
4855
err = ceph_pagelist_reserve(pagelist, need);
4856
if (err)
4857
goto fail;
4858
4859
ceph_pagelist_encode_8(pagelist, 1);
4860
ceph_pagelist_encode_8(pagelist, 1);
4861
ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
4862
}
4863
4864
doutc(cl, " adding snap realm %llx seq %lld parent %llx\n",
4865
realm->ino, realm->seq, realm->parent_ino);
4866
sr_rec.ino = cpu_to_le64(realm->ino);
4867
sr_rec.seq = cpu_to_le64(realm->seq);
4868
sr_rec.parent = cpu_to_le64(realm->parent_ino);
4869
4870
err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
4871
if (err)
4872
goto fail;
4873
4874
recon_state->nr_realms++;
4875
}
4876
fail:
4877
return err;
4878
}
4879
4880
4881
/*
4882
* If an MDS fails and recovers, clients need to reconnect in order to
4883
* reestablish shared state. This includes all caps issued through
4884
* this session _and_ the snap_realm hierarchy. Because it's not
4885
* clear which snap realms the mds cares about, we send everything we
4886
* know about.. that ensures we'll then get any new info the
4887
* recovering MDS might have.
4888
*
4889
* This is a relatively heavyweight operation, but it's rare.
4890
*/
4891
static void send_mds_reconnect(struct ceph_mds_client *mdsc,
4892
struct ceph_mds_session *session)
4893
{
4894
struct ceph_client *cl = mdsc->fsc->client;
4895
struct ceph_msg *reply;
4896
int mds = session->s_mds;
4897
int err = -ENOMEM;
4898
struct ceph_reconnect_state recon_state = {
4899
.session = session,
4900
};
4901
LIST_HEAD(dispose);
4902
4903
pr_info_client(cl, "mds%d reconnect start\n", mds);
4904
4905
recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
4906
if (!recon_state.pagelist)
4907
goto fail_nopagelist;
4908
4909
reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4910
if (!reply)
4911
goto fail_nomsg;
4912
4913
xa_destroy(&session->s_delegated_inos);
4914
4915
mutex_lock(&session->s_mutex);
4916
session->s_state = CEPH_MDS_SESSION_RECONNECTING;
4917
session->s_seq = 0;
4918
4919
doutc(cl, "session %p state %s\n", session,
4920
ceph_session_state_name(session->s_state));
4921
4922
atomic_inc(&session->s_cap_gen);
4923
4924
spin_lock(&session->s_cap_lock);
4925
/* don't know if session is readonly */
4926
session->s_readonly = 0;
4927
/*
4928
* notify __ceph_remove_cap() that we are composing cap reconnect.
4929
* If a cap get released before being added to the cap reconnect,
4930
* __ceph_remove_cap() should skip queuing cap release.
4931
*/
4932
session->s_cap_reconnect = 1;
4933
/* drop old cap expires; we're about to reestablish that state */
4934
detach_cap_releases(session, &dispose);
4935
spin_unlock(&session->s_cap_lock);
4936
dispose_cap_releases(mdsc, &dispose);
4937
4938
/* trim unused caps to reduce MDS's cache rejoin time */
4939
if (mdsc->fsc->sb->s_root)
4940
shrink_dcache_parent(mdsc->fsc->sb->s_root);
4941
4942
ceph_con_close(&session->s_con);
4943
ceph_con_open(&session->s_con,
4944
CEPH_ENTITY_TYPE_MDS, mds,
4945
ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4946
4947
/* replay unsafe requests */
4948
replay_unsafe_requests(mdsc, session);
4949
4950
ceph_early_kick_flushing_caps(mdsc, session);
4951
4952
down_read(&mdsc->snap_rwsem);
4953
4954
/* placeholder for nr_caps */
4955
err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4956
if (err)
4957
goto fail;
4958
4959
if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4960
recon_state.msg_version = 3;
4961
recon_state.allow_multi = true;
4962
} else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4963
recon_state.msg_version = 3;
4964
} else {
4965
recon_state.msg_version = 2;
4966
}
4967
/* traverse this session's caps */
4968
err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4969
4970
spin_lock(&session->s_cap_lock);
4971
session->s_cap_reconnect = 0;
4972
spin_unlock(&session->s_cap_lock);
4973
4974
if (err < 0)
4975
goto fail;
4976
4977
/* check if all realms can be encoded into current message */
4978
if (mdsc->num_snap_realms) {
4979
size_t total_len =
4980
recon_state.pagelist->length +
4981
mdsc->num_snap_realms *
4982
sizeof(struct ceph_mds_snaprealm_reconnect);
4983
if (recon_state.msg_version >= 4) {
4984
/* number of realms */
4985
total_len += sizeof(u32);
4986
/* version, compat_version and struct_len */
4987
total_len += mdsc->num_snap_realms *
4988
(2 * sizeof(u8) + sizeof(u32));
4989
}
4990
if (total_len > RECONNECT_MAX_SIZE) {
4991
if (!recon_state.allow_multi) {
4992
err = -ENOSPC;
4993
goto fail;
4994
}
4995
if (recon_state.nr_caps) {
4996
err = send_reconnect_partial(&recon_state);
4997
if (err)
4998
goto fail;
4999
}
5000
recon_state.msg_version = 5;
5001
}
5002
}
5003
5004
err = encode_snap_realms(mdsc, &recon_state);
5005
if (err < 0)
5006
goto fail;
5007
5008
if (recon_state.msg_version >= 5) {
5009
err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
5010
if (err < 0)
5011
goto fail;
5012
}
5013
5014
if (recon_state.nr_caps || recon_state.nr_realms) {
5015
struct page *page =
5016
list_first_entry(&recon_state.pagelist->head,
5017
struct page, lru);
5018
__le32 *addr = kmap_atomic(page);
5019
if (recon_state.nr_caps) {
5020
WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
5021
*addr = cpu_to_le32(recon_state.nr_caps);
5022
} else if (recon_state.msg_version >= 4) {
5023
*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
5024
}
5025
kunmap_atomic(addr);
5026
}
5027
5028
reply->hdr.version = cpu_to_le16(recon_state.msg_version);
5029
if (recon_state.msg_version >= 4)
5030
reply->hdr.compat_version = cpu_to_le16(4);
5031
5032
reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
5033
ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
5034
5035
ceph_con_send(&session->s_con, reply);
5036
5037
mutex_unlock(&session->s_mutex);
5038
5039
mutex_lock(&mdsc->mutex);
5040
__wake_requests(mdsc, &session->s_waiting);
5041
mutex_unlock(&mdsc->mutex);
5042
5043
up_read(&mdsc->snap_rwsem);
5044
ceph_pagelist_release(recon_state.pagelist);
5045
return;
5046
5047
fail:
5048
ceph_msg_put(reply);
5049
up_read(&mdsc->snap_rwsem);
5050
mutex_unlock(&session->s_mutex);
5051
fail_nomsg:
5052
ceph_pagelist_release(recon_state.pagelist);
5053
fail_nopagelist:
5054
pr_err_client(cl, "error %d preparing reconnect for mds%d\n",
5055
err, mds);
5056
return;
5057
}
5058
5059
5060
/*
5061
* compare old and new mdsmaps, kicking requests
5062
* and closing out old connections as necessary
5063
*
5064
* called under mdsc->mutex.
5065
*/
5066
static void check_new_map(struct ceph_mds_client *mdsc,
5067
struct ceph_mdsmap *newmap,
5068
struct ceph_mdsmap *oldmap)
5069
{
5070
int i, j, err;
5071
int oldstate, newstate;
5072
struct ceph_mds_session *s;
5073
unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0};
5074
struct ceph_client *cl = mdsc->fsc->client;
5075
5076
doutc(cl, "new %u old %u\n", newmap->m_epoch, oldmap->m_epoch);
5077
5078
if (newmap->m_info) {
5079
for (i = 0; i < newmap->possible_max_rank; i++) {
5080
for (j = 0; j < newmap->m_info[i].num_export_targets; j++)
5081
set_bit(newmap->m_info[i].export_targets[j], targets);
5082
}
5083
}
5084
5085
for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
5086
if (!mdsc->sessions[i])
5087
continue;
5088
s = mdsc->sessions[i];
5089
oldstate = ceph_mdsmap_get_state(oldmap, i);
5090
newstate = ceph_mdsmap_get_state(newmap, i);
5091
5092
doutc(cl, "mds%d state %s%s -> %s%s (session %s)\n",
5093
i, ceph_mds_state_name(oldstate),
5094
ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
5095
ceph_mds_state_name(newstate),
5096
ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
5097
ceph_session_state_name(s->s_state));
5098
5099
if (i >= newmap->possible_max_rank) {
5100
/* force close session for stopped mds */
5101
ceph_get_mds_session(s);
5102
__unregister_session(mdsc, s);
5103
__wake_requests(mdsc, &s->s_waiting);
5104
mutex_unlock(&mdsc->mutex);
5105
5106
mutex_lock(&s->s_mutex);
5107
cleanup_session_requests(mdsc, s);
5108
remove_session_caps(s);
5109
mutex_unlock(&s->s_mutex);
5110
5111
ceph_put_mds_session(s);
5112
5113
mutex_lock(&mdsc->mutex);
5114
kick_requests(mdsc, i);
5115
continue;
5116
}
5117
5118
if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
5119
ceph_mdsmap_get_addr(newmap, i),
5120
sizeof(struct ceph_entity_addr))) {
5121
/* just close it */
5122
mutex_unlock(&mdsc->mutex);
5123
mutex_lock(&s->s_mutex);
5124
mutex_lock(&mdsc->mutex);
5125
ceph_con_close(&s->s_con);
5126
mutex_unlock(&s->s_mutex);
5127
s->s_state = CEPH_MDS_SESSION_RESTARTING;
5128
} else if (oldstate == newstate) {
5129
continue; /* nothing new with this mds */
5130
}
5131
5132
/*
5133
* send reconnect?
5134
*/
5135
if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
5136
newstate >= CEPH_MDS_STATE_RECONNECT) {
5137
mutex_unlock(&mdsc->mutex);
5138
clear_bit(i, targets);
5139
send_mds_reconnect(mdsc, s);
5140
mutex_lock(&mdsc->mutex);
5141
}
5142
5143
/*
5144
* kick request on any mds that has gone active.
5145
*/
5146
if (oldstate < CEPH_MDS_STATE_ACTIVE &&
5147
newstate >= CEPH_MDS_STATE_ACTIVE) {
5148
if (oldstate != CEPH_MDS_STATE_CREATING &&
5149
oldstate != CEPH_MDS_STATE_STARTING)
5150
pr_info_client(cl, "mds%d recovery completed\n",
5151
s->s_mds);
5152
kick_requests(mdsc, i);
5153
mutex_unlock(&mdsc->mutex);
5154
mutex_lock(&s->s_mutex);
5155
mutex_lock(&mdsc->mutex);
5156
ceph_kick_flushing_caps(mdsc, s);
5157
mutex_unlock(&s->s_mutex);
5158
wake_up_session_caps(s, RECONNECT);
5159
}
5160
}
5161
5162
/*
5163
* Only open and reconnect sessions that don't exist yet.
5164
*/
5165
for (i = 0; i < newmap->possible_max_rank; i++) {
5166
/*
5167
* In case the import MDS is crashed just after
5168
* the EImportStart journal is flushed, so when
5169
* a standby MDS takes over it and is replaying
5170
* the EImportStart journal the new MDS daemon
5171
* will wait the client to reconnect it, but the
5172
* client may never register/open the session yet.
5173
*
5174
* Will try to reconnect that MDS daemon if the
5175
* rank number is in the export targets array and
5176
* is the up:reconnect state.
5177
*/
5178
newstate = ceph_mdsmap_get_state(newmap, i);
5179
if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT)
5180
continue;
5181
5182
/*
5183
* The session maybe registered and opened by some
5184
* requests which were choosing random MDSes during
5185
* the mdsc->mutex's unlock/lock gap below in rare
5186
* case. But the related MDS daemon will just queue
5187
* that requests and be still waiting for the client's
5188
* reconnection request in up:reconnect state.
5189
*/
5190
s = __ceph_lookup_mds_session(mdsc, i);
5191
if (likely(!s)) {
5192
s = __open_export_target_session(mdsc, i);
5193
if (IS_ERR(s)) {
5194
err = PTR_ERR(s);
5195
pr_err_client(cl,
5196
"failed to open export target session, err %d\n",
5197
err);
5198
continue;
5199
}
5200
}
5201
doutc(cl, "send reconnect to export target mds.%d\n", i);
5202
mutex_unlock(&mdsc->mutex);
5203
send_mds_reconnect(mdsc, s);
5204
ceph_put_mds_session(s);
5205
mutex_lock(&mdsc->mutex);
5206
}
5207
5208
for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
5209
s = mdsc->sessions[i];
5210
if (!s)
5211
continue;
5212
if (!ceph_mdsmap_is_laggy(newmap, i))
5213
continue;
5214
if (s->s_state == CEPH_MDS_SESSION_OPEN ||
5215
s->s_state == CEPH_MDS_SESSION_HUNG ||
5216
s->s_state == CEPH_MDS_SESSION_CLOSING) {
5217
doutc(cl, " connecting to export targets of laggy mds%d\n", i);
5218
__open_export_target_sessions(mdsc, s);
5219
}
5220
}
5221
}
5222
5223
5224
5225
/*
5226
* leases
5227
*/
5228
5229
/*
5230
* caller must hold session s_mutex, dentry->d_lock
5231
*/
5232
void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
5233
{
5234
struct ceph_dentry_info *di = ceph_dentry(dentry);
5235
5236
ceph_put_mds_session(di->lease_session);
5237
di->lease_session = NULL;
5238
}
5239
5240
static void handle_lease(struct ceph_mds_client *mdsc,
5241
struct ceph_mds_session *session,
5242
struct ceph_msg *msg)
5243
{
5244
struct ceph_client *cl = mdsc->fsc->client;
5245
struct super_block *sb = mdsc->fsc->sb;
5246
struct inode *inode;
5247
struct dentry *parent, *dentry;
5248
struct ceph_dentry_info *di;
5249
int mds = session->s_mds;
5250
struct ceph_mds_lease *h = msg->front.iov_base;
5251
u32 seq;
5252
struct ceph_vino vino;
5253
struct qstr dname;
5254
int release = 0;
5255
5256
doutc(cl, "from mds%d\n", mds);
5257
5258
if (!ceph_inc_mds_stopping_blocker(mdsc, session))
5259
return;
5260
5261
/* decode */
5262
if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
5263
goto bad;
5264
vino.ino = le64_to_cpu(h->ino);
5265
vino.snap = CEPH_NOSNAP;
5266
seq = le32_to_cpu(h->seq);
5267
dname.len = get_unaligned_le32(h + 1);
5268
if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
5269
goto bad;
5270
dname.name = (void *)(h + 1) + sizeof(u32);
5271
5272
/* lookup inode */
5273
inode = ceph_find_inode(sb, vino);
5274
doutc(cl, "%s, ino %llx %p %.*s\n", ceph_lease_op_name(h->action),
5275
vino.ino, inode, dname.len, dname.name);
5276
5277
mutex_lock(&session->s_mutex);
5278
if (!inode) {
5279
doutc(cl, "no inode %llx\n", vino.ino);
5280
goto release;
5281
}
5282
5283
/* dentry */
5284
parent = d_find_alias(inode);
5285
if (!parent) {
5286
doutc(cl, "no parent dentry on inode %p\n", inode);
5287
WARN_ON(1);
5288
goto release; /* hrm... */
5289
}
5290
dname.hash = full_name_hash(parent, dname.name, dname.len);
5291
dentry = d_lookup(parent, &dname);
5292
dput(parent);
5293
if (!dentry)
5294
goto release;
5295
5296
spin_lock(&dentry->d_lock);
5297
di = ceph_dentry(dentry);
5298
switch (h->action) {
5299
case CEPH_MDS_LEASE_REVOKE:
5300
if (di->lease_session == session) {
5301
if (ceph_seq_cmp(di->lease_seq, seq) > 0)
5302
h->seq = cpu_to_le32(di->lease_seq);
5303
__ceph_mdsc_drop_dentry_lease(dentry);
5304
}
5305
release = 1;
5306
break;
5307
5308
case CEPH_MDS_LEASE_RENEW:
5309
if (di->lease_session == session &&
5310
di->lease_gen == atomic_read(&session->s_cap_gen) &&
5311
di->lease_renew_from &&
5312
di->lease_renew_after == 0) {
5313
unsigned long duration =
5314
msecs_to_jiffies(le32_to_cpu(h->duration_ms));
5315
5316
di->lease_seq = seq;
5317
di->time = di->lease_renew_from + duration;
5318
di->lease_renew_after = di->lease_renew_from +
5319
(duration >> 1);
5320
di->lease_renew_from = 0;
5321
}
5322
break;
5323
}
5324
spin_unlock(&dentry->d_lock);
5325
dput(dentry);
5326
5327
if (!release)
5328
goto out;
5329
5330
release:
5331
/* let's just reuse the same message */
5332
h->action = CEPH_MDS_LEASE_REVOKE_ACK;
5333
ceph_msg_get(msg);
5334
ceph_con_send(&session->s_con, msg);
5335
5336
out:
5337
mutex_unlock(&session->s_mutex);
5338
iput(inode);
5339
5340
ceph_dec_mds_stopping_blocker(mdsc);
5341
return;
5342
5343
bad:
5344
ceph_dec_mds_stopping_blocker(mdsc);
5345
5346
pr_err_client(cl, "corrupt lease message\n");
5347
ceph_msg_dump(msg);
5348
}
5349
5350
void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
5351
struct dentry *dentry, char action,
5352
u32 seq)
5353
{
5354
struct ceph_client *cl = session->s_mdsc->fsc->client;
5355
struct ceph_msg *msg;
5356
struct ceph_mds_lease *lease;
5357
struct inode *dir;
5358
int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
5359
5360
doutc(cl, "identry %p %s to mds%d\n", dentry, ceph_lease_op_name(action),
5361
session->s_mds);
5362
5363
msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
5364
if (!msg)
5365
return;
5366
lease = msg->front.iov_base;
5367
lease->action = action;
5368
lease->seq = cpu_to_le32(seq);
5369
5370
spin_lock(&dentry->d_lock);
5371
dir = d_inode(dentry->d_parent);
5372
lease->ino = cpu_to_le64(ceph_ino(dir));
5373
lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
5374
5375
put_unaligned_le32(dentry->d_name.len, lease + 1);
5376
memcpy((void *)(lease + 1) + 4,
5377
dentry->d_name.name, dentry->d_name.len);
5378
spin_unlock(&dentry->d_lock);
5379
5380
ceph_con_send(&session->s_con, msg);
5381
}
5382
5383
/*
5384
* lock unlock the session, to wait ongoing session activities
5385
*/
5386
static void lock_unlock_session(struct ceph_mds_session *s)
5387
{
5388
mutex_lock(&s->s_mutex);
5389
mutex_unlock(&s->s_mutex);
5390
}
5391
5392
static void maybe_recover_session(struct ceph_mds_client *mdsc)
5393
{
5394
struct ceph_client *cl = mdsc->fsc->client;
5395
struct ceph_fs_client *fsc = mdsc->fsc;
5396
5397
if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
5398
return;
5399
5400
if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
5401
return;
5402
5403
if (!READ_ONCE(fsc->blocklisted))
5404
return;
5405
5406
pr_info_client(cl, "auto reconnect after blocklisted\n");
5407
ceph_force_reconnect(fsc->sb);
5408
}
5409
5410
bool check_session_state(struct ceph_mds_session *s)
5411
{
5412
struct ceph_client *cl = s->s_mdsc->fsc->client;
5413
5414
switch (s->s_state) {
5415
case CEPH_MDS_SESSION_OPEN:
5416
if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
5417
s->s_state = CEPH_MDS_SESSION_HUNG;
5418
pr_info_client(cl, "mds%d hung\n", s->s_mds);
5419
}
5420
break;
5421
case CEPH_MDS_SESSION_CLOSING:
5422
case CEPH_MDS_SESSION_NEW:
5423
case CEPH_MDS_SESSION_RESTARTING:
5424
case CEPH_MDS_SESSION_CLOSED:
5425
case CEPH_MDS_SESSION_REJECTED:
5426
return false;
5427
}
5428
5429
return true;
5430
}
5431
5432
/*
5433
* If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
5434
* then we need to retransmit that request.
5435
*/
5436
void inc_session_sequence(struct ceph_mds_session *s)
5437
{
5438
struct ceph_client *cl = s->s_mdsc->fsc->client;
5439
5440
lockdep_assert_held(&s->s_mutex);
5441
5442
s->s_seq++;
5443
5444
if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
5445
int ret;
5446
5447
doutc(cl, "resending session close request for mds%d\n", s->s_mds);
5448
ret = request_close_session(s);
5449
if (ret < 0)
5450
pr_err_client(cl, "unable to close session to mds%d: %d\n",
5451
s->s_mds, ret);
5452
}
5453
}
5454
5455
/*
5456
* delayed work -- periodically trim expired leases, renew caps with mds. If
5457
* the @delay parameter is set to 0 or if it's more than 5 secs, the default
5458
* workqueue delay value of 5 secs will be used.
5459
*/
5460
static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
5461
{
5462
unsigned long max_delay = HZ * 5;
5463
5464
/* 5 secs default delay */
5465
if (!delay || (delay > max_delay))
5466
delay = max_delay;
5467
schedule_delayed_work(&mdsc->delayed_work,
5468
round_jiffies_relative(delay));
5469
}
5470
5471
static void delayed_work(struct work_struct *work)
5472
{
5473
struct ceph_mds_client *mdsc =
5474
container_of(work, struct ceph_mds_client, delayed_work.work);
5475
unsigned long delay;
5476
int renew_interval;
5477
int renew_caps;
5478
int i;
5479
5480
doutc(mdsc->fsc->client, "mdsc delayed_work\n");
5481
5482
if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
5483
return;
5484
5485
mutex_lock(&mdsc->mutex);
5486
renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
5487
renew_caps = time_after_eq(jiffies, HZ*renew_interval +
5488
mdsc->last_renew_caps);
5489
if (renew_caps)
5490
mdsc->last_renew_caps = jiffies;
5491
5492
for (i = 0; i < mdsc->max_sessions; i++) {
5493
struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
5494
if (!s)
5495
continue;
5496
5497
if (!check_session_state(s)) {
5498
ceph_put_mds_session(s);
5499
continue;
5500
}
5501
mutex_unlock(&mdsc->mutex);
5502
5503
ceph_flush_session_cap_releases(mdsc, s);
5504
5505
mutex_lock(&s->s_mutex);
5506
if (renew_caps)
5507
send_renew_caps(mdsc, s);
5508
else
5509
ceph_con_keepalive(&s->s_con);
5510
if (s->s_state == CEPH_MDS_SESSION_OPEN ||
5511
s->s_state == CEPH_MDS_SESSION_HUNG)
5512
ceph_send_cap_releases(mdsc, s);
5513
mutex_unlock(&s->s_mutex);
5514
ceph_put_mds_session(s);
5515
5516
mutex_lock(&mdsc->mutex);
5517
}
5518
mutex_unlock(&mdsc->mutex);
5519
5520
delay = ceph_check_delayed_caps(mdsc);
5521
5522
ceph_queue_cap_reclaim_work(mdsc);
5523
5524
ceph_trim_snapid_map(mdsc);
5525
5526
maybe_recover_session(mdsc);
5527
5528
schedule_delayed(mdsc, delay);
5529
}
5530
5531
int ceph_mdsc_init(struct ceph_fs_client *fsc)
5532
5533
{
5534
struct ceph_mds_client *mdsc;
5535
int err;
5536
5537
mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
5538
if (!mdsc)
5539
return -ENOMEM;
5540
mdsc->fsc = fsc;
5541
mutex_init(&mdsc->mutex);
5542
mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
5543
if (!mdsc->mdsmap) {
5544
err = -ENOMEM;
5545
goto err_mdsc;
5546
}
5547
5548
init_completion(&mdsc->safe_umount_waiters);
5549
spin_lock_init(&mdsc->stopping_lock);
5550
atomic_set(&mdsc->stopping_blockers, 0);
5551
init_completion(&mdsc->stopping_waiter);
5552
atomic64_set(&mdsc->dirty_folios, 0);
5553
init_waitqueue_head(&mdsc->flush_end_wq);
5554
init_waitqueue_head(&mdsc->session_close_wq);
5555
INIT_LIST_HEAD(&mdsc->waiting_for_map);
5556
mdsc->quotarealms_inodes = RB_ROOT;
5557
mutex_init(&mdsc->quotarealms_inodes_mutex);
5558
init_rwsem(&mdsc->snap_rwsem);
5559
mdsc->snap_realms = RB_ROOT;
5560
INIT_LIST_HEAD(&mdsc->snap_empty);
5561
spin_lock_init(&mdsc->snap_empty_lock);
5562
mdsc->request_tree = RB_ROOT;
5563
INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
5564
mdsc->last_renew_caps = jiffies;
5565
INIT_LIST_HEAD(&mdsc->cap_delay_list);
5566
#ifdef CONFIG_DEBUG_FS
5567
INIT_LIST_HEAD(&mdsc->cap_wait_list);
5568
#endif
5569
spin_lock_init(&mdsc->cap_delay_lock);
5570
INIT_LIST_HEAD(&mdsc->cap_unlink_delay_list);
5571
INIT_LIST_HEAD(&mdsc->snap_flush_list);
5572
spin_lock_init(&mdsc->snap_flush_lock);
5573
mdsc->last_cap_flush_tid = 1;
5574
INIT_LIST_HEAD(&mdsc->cap_flush_list);
5575
INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
5576
spin_lock_init(&mdsc->cap_dirty_lock);
5577
init_waitqueue_head(&mdsc->cap_flushing_wq);
5578
INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
5579
INIT_WORK(&mdsc->cap_unlink_work, ceph_cap_unlink_work);
5580
err = ceph_metric_init(&mdsc->metric);
5581
if (err)
5582
goto err_mdsmap;
5583
5584
spin_lock_init(&mdsc->dentry_list_lock);
5585
INIT_LIST_HEAD(&mdsc->dentry_leases);
5586
INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
5587
5588
ceph_caps_init(mdsc);
5589
ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
5590
5591
spin_lock_init(&mdsc->snapid_map_lock);
5592
mdsc->snapid_map_tree = RB_ROOT;
5593
INIT_LIST_HEAD(&mdsc->snapid_map_lru);
5594
5595
init_rwsem(&mdsc->pool_perm_rwsem);
5596
mdsc->pool_perm_tree = RB_ROOT;
5597
5598
strscpy(mdsc->nodename, utsname()->nodename,
5599
sizeof(mdsc->nodename));
5600
5601
fsc->mdsc = mdsc;
5602
return 0;
5603
5604
err_mdsmap:
5605
kfree(mdsc->mdsmap);
5606
err_mdsc:
5607
kfree(mdsc);
5608
return err;
5609
}
5610
5611
/*
5612
* Wait for safe replies on open mds requests. If we time out, drop
5613
* all requests from the tree to avoid dangling dentry refs.
5614
*/
5615
static void wait_requests(struct ceph_mds_client *mdsc)
5616
{
5617
struct ceph_client *cl = mdsc->fsc->client;
5618
struct ceph_options *opts = mdsc->fsc->client->options;
5619
struct ceph_mds_request *req;
5620
5621
mutex_lock(&mdsc->mutex);
5622
if (__get_oldest_req(mdsc)) {
5623
mutex_unlock(&mdsc->mutex);
5624
5625
doutc(cl, "waiting for requests\n");
5626
wait_for_completion_timeout(&mdsc->safe_umount_waiters,
5627
ceph_timeout_jiffies(opts->mount_timeout));
5628
5629
/* tear down remaining requests */
5630
mutex_lock(&mdsc->mutex);
5631
while ((req = __get_oldest_req(mdsc))) {
5632
doutc(cl, "timed out on tid %llu\n", req->r_tid);
5633
list_del_init(&req->r_wait);
5634
__unregister_request(mdsc, req);
5635
}
5636
}
5637
mutex_unlock(&mdsc->mutex);
5638
doutc(cl, "done\n");
5639
}
5640
5641
void send_flush_mdlog(struct ceph_mds_session *s)
5642
{
5643
struct ceph_client *cl = s->s_mdsc->fsc->client;
5644
struct ceph_msg *msg;
5645
5646
/*
5647
* Pre-luminous MDS crashes when it sees an unknown session request
5648
*/
5649
if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
5650
return;
5651
5652
mutex_lock(&s->s_mutex);
5653
doutc(cl, "request mdlog flush to mds%d (%s)s seq %lld\n",
5654
s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
5655
msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
5656
s->s_seq);
5657
if (!msg) {
5658
pr_err_client(cl, "failed to request mdlog flush to mds%d (%s) seq %lld\n",
5659
s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
5660
} else {
5661
ceph_con_send(&s->s_con, msg);
5662
}
5663
mutex_unlock(&s->s_mutex);
5664
}
5665
5666
static int ceph_mds_auth_match(struct ceph_mds_client *mdsc,
5667
struct ceph_mds_cap_auth *auth,
5668
const struct cred *cred,
5669
char *tpath)
5670
{
5671
u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid);
5672
u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid);
5673
struct ceph_client *cl = mdsc->fsc->client;
5674
const char *fs_name = mdsc->mdsmap->m_fs_name;
5675
const char *spath = mdsc->fsc->mount_options->server_path;
5676
bool gid_matched = false;
5677
u32 gid, tlen, len;
5678
int i, j;
5679
5680
doutc(cl, "fsname check fs_name=%s match.fs_name=%s\n",
5681
fs_name, auth->match.fs_name ? auth->match.fs_name : "");
5682
5683
if (!ceph_namespace_match(auth->match.fs_name, fs_name)) {
5684
/* fsname mismatch, try next one */
5685
return 0;
5686
}
5687
5688
doutc(cl, "match.uid %lld\n", auth->match.uid);
5689
if (auth->match.uid != MDS_AUTH_UID_ANY) {
5690
if (auth->match.uid != caller_uid)
5691
return 0;
5692
if (auth->match.num_gids) {
5693
for (i = 0; i < auth->match.num_gids; i++) {
5694
if (caller_gid == auth->match.gids[i])
5695
gid_matched = true;
5696
}
5697
if (!gid_matched && cred->group_info->ngroups) {
5698
for (i = 0; i < cred->group_info->ngroups; i++) {
5699
gid = from_kgid(&init_user_ns,
5700
cred->group_info->gid[i]);
5701
for (j = 0; j < auth->match.num_gids; j++) {
5702
if (gid == auth->match.gids[j]) {
5703
gid_matched = true;
5704
break;
5705
}
5706
}
5707
if (gid_matched)
5708
break;
5709
}
5710
}
5711
if (!gid_matched)
5712
return 0;
5713
}
5714
}
5715
5716
/* path match */
5717
if (auth->match.path) {
5718
if (!tpath)
5719
return 0;
5720
5721
tlen = strlen(tpath);
5722
len = strlen(auth->match.path);
5723
if (len) {
5724
char *_tpath = tpath;
5725
bool free_tpath = false;
5726
int m, n;
5727
5728
doutc(cl, "server path %s, tpath %s, match.path %s\n",
5729
spath, tpath, auth->match.path);
5730
if (spath && (m = strlen(spath)) != 1) {
5731
/* mount path + '/' + tpath + an extra space */
5732
n = m + 1 + tlen + 1;
5733
_tpath = kmalloc(n, GFP_NOFS);
5734
if (!_tpath)
5735
return -ENOMEM;
5736
/* remove the leading '/' */
5737
snprintf(_tpath, n, "%s/%s", spath + 1, tpath);
5738
free_tpath = true;
5739
tlen = strlen(_tpath);
5740
}
5741
5742
/*
5743
* Please note the tailing '/' for match.path has already
5744
* been removed when parsing.
5745
*
5746
* Remove the tailing '/' for the target path.
5747
*/
5748
while (tlen && _tpath[tlen - 1] == '/') {
5749
_tpath[tlen - 1] = '\0';
5750
tlen -= 1;
5751
}
5752
doutc(cl, "_tpath %s\n", _tpath);
5753
5754
/*
5755
* In case first == _tpath && tlen == len:
5756
* match.path=/foo --> /foo _path=/foo --> match
5757
* match.path=/foo/ --> /foo _path=/foo --> match
5758
*
5759
* In case first == _tmatch.path && tlen > len:
5760
* match.path=/foo/ --> /foo _path=/foo/ --> match
5761
* match.path=/foo --> /foo _path=/foo/ --> match
5762
* match.path=/foo/ --> /foo _path=/foo/d --> match
5763
* match.path=/foo --> /foo _path=/food --> mismatch
5764
*
5765
* All the other cases --> mismatch
5766
*/
5767
bool path_matched = true;
5768
char *first = strstr(_tpath, auth->match.path);
5769
if (first != _tpath ||
5770
(tlen > len && _tpath[len] != '/')) {
5771
path_matched = false;
5772
}
5773
5774
if (free_tpath)
5775
kfree(_tpath);
5776
5777
if (!path_matched)
5778
return 0;
5779
}
5780
}
5781
5782
doutc(cl, "matched\n");
5783
return 1;
5784
}
5785
5786
int ceph_mds_check_access(struct ceph_mds_client *mdsc, char *tpath, int mask)
5787
{
5788
const struct cred *cred = get_current_cred();
5789
u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid);
5790
u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid);
5791
struct ceph_mds_cap_auth *rw_perms_s = NULL;
5792
struct ceph_client *cl = mdsc->fsc->client;
5793
bool root_squash_perms = true;
5794
int i, err;
5795
5796
doutc(cl, "tpath '%s', mask %d, caller_uid %d, caller_gid %d\n",
5797
tpath, mask, caller_uid, caller_gid);
5798
5799
for (i = 0; i < mdsc->s_cap_auths_num; i++) {
5800
struct ceph_mds_cap_auth *s = &mdsc->s_cap_auths[i];
5801
5802
err = ceph_mds_auth_match(mdsc, s, cred, tpath);
5803
if (err < 0) {
5804
put_cred(cred);
5805
return err;
5806
} else if (err > 0) {
5807
/* always follow the last auth caps' permission */
5808
root_squash_perms = true;
5809
rw_perms_s = NULL;
5810
if ((mask & MAY_WRITE) && s->writeable &&
5811
s->match.root_squash && (!caller_uid || !caller_gid))
5812
root_squash_perms = false;
5813
5814
if (((mask & MAY_WRITE) && !s->writeable) ||
5815
((mask & MAY_READ) && !s->readable))
5816
rw_perms_s = s;
5817
}
5818
}
5819
5820
put_cred(cred);
5821
5822
doutc(cl, "root_squash_perms %d, rw_perms_s %p\n", root_squash_perms,
5823
rw_perms_s);
5824
if (root_squash_perms && rw_perms_s == NULL) {
5825
doutc(cl, "access allowed\n");
5826
return 0;
5827
}
5828
5829
if (!root_squash_perms) {
5830
doutc(cl, "root_squash is enabled and user(%d %d) isn't allowed to write",
5831
caller_uid, caller_gid);
5832
}
5833
if (rw_perms_s) {
5834
doutc(cl, "mds auth caps readable/writeable %d/%d while request r/w %d/%d",
5835
rw_perms_s->readable, rw_perms_s->writeable,
5836
!!(mask & MAY_READ), !!(mask & MAY_WRITE));
5837
}
5838
doutc(cl, "access denied\n");
5839
return -EACCES;
5840
}
5841
5842
/*
5843
* called before mount is ro, and before dentries are torn down.
5844
* (hmm, does this still race with new lookups?)
5845
*/
5846
void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
5847
{
5848
doutc(mdsc->fsc->client, "begin\n");
5849
mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN;
5850
5851
ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
5852
ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
5853
ceph_flush_dirty_caps(mdsc);
5854
wait_requests(mdsc);
5855
5856
/*
5857
* wait for reply handlers to drop their request refs and
5858
* their inode/dcache refs
5859
*/
5860
ceph_msgr_flush();
5861
5862
ceph_cleanup_quotarealms_inodes(mdsc);
5863
doutc(mdsc->fsc->client, "done\n");
5864
}
5865
5866
/*
5867
* flush the mdlog and wait for all write mds requests to flush.
5868
*/
5869
static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc,
5870
u64 want_tid)
5871
{
5872
struct ceph_client *cl = mdsc->fsc->client;
5873
struct ceph_mds_request *req = NULL, *nextreq;
5874
struct ceph_mds_session *last_session = NULL;
5875
struct rb_node *n;
5876
5877
mutex_lock(&mdsc->mutex);
5878
doutc(cl, "want %lld\n", want_tid);
5879
restart:
5880
req = __get_oldest_req(mdsc);
5881
while (req && req->r_tid <= want_tid) {
5882
/* find next request */
5883
n = rb_next(&req->r_node);
5884
if (n)
5885
nextreq = rb_entry(n, struct ceph_mds_request, r_node);
5886
else
5887
nextreq = NULL;
5888
if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
5889
(req->r_op & CEPH_MDS_OP_WRITE)) {
5890
struct ceph_mds_session *s = req->r_session;
5891
5892
if (!s) {
5893
req = nextreq;
5894
continue;
5895
}
5896
5897
/* write op */
5898
ceph_mdsc_get_request(req);
5899
if (nextreq)
5900
ceph_mdsc_get_request(nextreq);
5901
s = ceph_get_mds_session(s);
5902
mutex_unlock(&mdsc->mutex);
5903
5904
/* send flush mdlog request to MDS */
5905
if (last_session != s) {
5906
send_flush_mdlog(s);
5907
ceph_put_mds_session(last_session);
5908
last_session = s;
5909
} else {
5910
ceph_put_mds_session(s);
5911
}
5912
doutc(cl, "wait on %llu (want %llu)\n",
5913
req->r_tid, want_tid);
5914
wait_for_completion(&req->r_safe_completion);
5915
5916
mutex_lock(&mdsc->mutex);
5917
ceph_mdsc_put_request(req);
5918
if (!nextreq)
5919
break; /* next dne before, so we're done! */
5920
if (RB_EMPTY_NODE(&nextreq->r_node)) {
5921
/* next request was removed from tree */
5922
ceph_mdsc_put_request(nextreq);
5923
goto restart;
5924
}
5925
ceph_mdsc_put_request(nextreq); /* won't go away */
5926
}
5927
req = nextreq;
5928
}
5929
mutex_unlock(&mdsc->mutex);
5930
ceph_put_mds_session(last_session);
5931
doutc(cl, "done\n");
5932
}
5933
5934
void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
5935
{
5936
struct ceph_client *cl = mdsc->fsc->client;
5937
u64 want_tid, want_flush;
5938
5939
if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
5940
return;
5941
5942
doutc(cl, "sync\n");
5943
mutex_lock(&mdsc->mutex);
5944
want_tid = mdsc->last_tid;
5945
mutex_unlock(&mdsc->mutex);
5946
5947
ceph_flush_dirty_caps(mdsc);
5948
ceph_flush_cap_releases(mdsc);
5949
spin_lock(&mdsc->cap_dirty_lock);
5950
want_flush = mdsc->last_cap_flush_tid;
5951
if (!list_empty(&mdsc->cap_flush_list)) {
5952
struct ceph_cap_flush *cf =
5953
list_last_entry(&mdsc->cap_flush_list,
5954
struct ceph_cap_flush, g_list);
5955
cf->wake = true;
5956
}
5957
spin_unlock(&mdsc->cap_dirty_lock);
5958
5959
doutc(cl, "sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
5960
5961
flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid);
5962
wait_caps_flush(mdsc, want_flush);
5963
}
5964
5965
/*
5966
* true if all sessions are closed, or we force unmount
5967
*/
5968
static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
5969
{
5970
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
5971
return true;
5972
return atomic_read(&mdsc->num_sessions) <= skipped;
5973
}
5974
5975
/*
5976
* called after sb is ro or when metadata corrupted.
5977
*/
5978
void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
5979
{
5980
struct ceph_options *opts = mdsc->fsc->client->options;
5981
struct ceph_client *cl = mdsc->fsc->client;
5982
struct ceph_mds_session *session;
5983
int i;
5984
int skipped = 0;
5985
5986
doutc(cl, "begin\n");
5987
5988
/* close sessions */
5989
mutex_lock(&mdsc->mutex);
5990
for (i = 0; i < mdsc->max_sessions; i++) {
5991
session = __ceph_lookup_mds_session(mdsc, i);
5992
if (!session)
5993
continue;
5994
mutex_unlock(&mdsc->mutex);
5995
mutex_lock(&session->s_mutex);
5996
if (__close_session(mdsc, session) <= 0)
5997
skipped++;
5998
mutex_unlock(&session->s_mutex);
5999
ceph_put_mds_session(session);
6000
mutex_lock(&mdsc->mutex);
6001
}
6002
mutex_unlock(&mdsc->mutex);
6003
6004
doutc(cl, "waiting for sessions to close\n");
6005
wait_event_timeout(mdsc->session_close_wq,
6006
done_closing_sessions(mdsc, skipped),
6007
ceph_timeout_jiffies(opts->mount_timeout));
6008
6009
/* tear down remaining sessions */
6010
mutex_lock(&mdsc->mutex);
6011
for (i = 0; i < mdsc->max_sessions; i++) {
6012
if (mdsc->sessions[i]) {
6013
session = ceph_get_mds_session(mdsc->sessions[i]);
6014
__unregister_session(mdsc, session);
6015
mutex_unlock(&mdsc->mutex);
6016
mutex_lock(&session->s_mutex);
6017
remove_session_caps(session);
6018
mutex_unlock(&session->s_mutex);
6019
ceph_put_mds_session(session);
6020
mutex_lock(&mdsc->mutex);
6021
}
6022
}
6023
WARN_ON(!list_empty(&mdsc->cap_delay_list));
6024
mutex_unlock(&mdsc->mutex);
6025
6026
ceph_cleanup_snapid_map(mdsc);
6027
ceph_cleanup_global_and_empty_realms(mdsc);
6028
6029
cancel_work_sync(&mdsc->cap_reclaim_work);
6030
cancel_work_sync(&mdsc->cap_unlink_work);
6031
cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
6032
6033
doutc(cl, "done\n");
6034
}
6035
6036
void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
6037
{
6038
struct ceph_mds_session *session;
6039
int mds;
6040
6041
doutc(mdsc->fsc->client, "force umount\n");
6042
6043
mutex_lock(&mdsc->mutex);
6044
for (mds = 0; mds < mdsc->max_sessions; mds++) {
6045
session = __ceph_lookup_mds_session(mdsc, mds);
6046
if (!session)
6047
continue;
6048
6049
if (session->s_state == CEPH_MDS_SESSION_REJECTED)
6050
__unregister_session(mdsc, session);
6051
__wake_requests(mdsc, &session->s_waiting);
6052
mutex_unlock(&mdsc->mutex);
6053
6054
mutex_lock(&session->s_mutex);
6055
__close_session(mdsc, session);
6056
if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
6057
cleanup_session_requests(mdsc, session);
6058
remove_session_caps(session);
6059
}
6060
mutex_unlock(&session->s_mutex);
6061
ceph_put_mds_session(session);
6062
6063
mutex_lock(&mdsc->mutex);
6064
kick_requests(mdsc, mds);
6065
}
6066
__wake_requests(mdsc, &mdsc->waiting_for_map);
6067
mutex_unlock(&mdsc->mutex);
6068
}
6069
6070
static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
6071
{
6072
doutc(mdsc->fsc->client, "stop\n");
6073
/*
6074
* Make sure the delayed work stopped before releasing
6075
* the resources.
6076
*
6077
* Because the cancel_delayed_work_sync() will only
6078
* guarantee that the work finishes executing. But the
6079
* delayed work will re-arm itself again after that.
6080
*/
6081
flush_delayed_work(&mdsc->delayed_work);
6082
6083
if (mdsc->mdsmap)
6084
ceph_mdsmap_destroy(mdsc->mdsmap);
6085
kfree(mdsc->sessions);
6086
ceph_caps_finalize(mdsc);
6087
6088
if (mdsc->s_cap_auths) {
6089
int i;
6090
6091
for (i = 0; i < mdsc->s_cap_auths_num; i++) {
6092
kfree(mdsc->s_cap_auths[i].match.gids);
6093
kfree(mdsc->s_cap_auths[i].match.path);
6094
kfree(mdsc->s_cap_auths[i].match.fs_name);
6095
}
6096
kfree(mdsc->s_cap_auths);
6097
}
6098
6099
ceph_pool_perm_destroy(mdsc);
6100
}
6101
6102
void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
6103
{
6104
struct ceph_mds_client *mdsc = fsc->mdsc;
6105
doutc(fsc->client, "%p\n", mdsc);
6106
6107
if (!mdsc)
6108
return;
6109
6110
/* flush out any connection work with references to us */
6111
ceph_msgr_flush();
6112
6113
ceph_mdsc_stop(mdsc);
6114
6115
ceph_metric_destroy(&mdsc->metric);
6116
6117
fsc->mdsc = NULL;
6118
kfree(mdsc);
6119
doutc(fsc->client, "%p done\n", mdsc);
6120
}
6121
6122
void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
6123
{
6124
struct ceph_fs_client *fsc = mdsc->fsc;
6125
struct ceph_client *cl = fsc->client;
6126
const char *mds_namespace = fsc->mount_options->mds_namespace;
6127
void *p = msg->front.iov_base;
6128
void *end = p + msg->front.iov_len;
6129
u32 epoch;
6130
u32 num_fs;
6131
u32 mount_fscid = (u32)-1;
6132
int err = -EINVAL;
6133
6134
ceph_decode_need(&p, end, sizeof(u32), bad);
6135
epoch = ceph_decode_32(&p);
6136
6137
doutc(cl, "epoch %u\n", epoch);
6138
6139
/* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
6140
ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
6141
6142
ceph_decode_32_safe(&p, end, num_fs, bad);
6143
while (num_fs-- > 0) {
6144
void *info_p, *info_end;
6145
u32 info_len;
6146
u32 fscid, namelen;
6147
6148
ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
6149
p += 2; // info_v, info_cv
6150
info_len = ceph_decode_32(&p);
6151
ceph_decode_need(&p, end, info_len, bad);
6152
info_p = p;
6153
info_end = p + info_len;
6154
p = info_end;
6155
6156
ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
6157
fscid = ceph_decode_32(&info_p);
6158
namelen = ceph_decode_32(&info_p);
6159
ceph_decode_need(&info_p, info_end, namelen, bad);
6160
6161
if (mds_namespace &&
6162
strlen(mds_namespace) == namelen &&
6163
!strncmp(mds_namespace, (char *)info_p, namelen)) {
6164
mount_fscid = fscid;
6165
break;
6166
}
6167
}
6168
6169
ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
6170
if (mount_fscid != (u32)-1) {
6171
fsc->client->monc.fs_cluster_id = mount_fscid;
6172
ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
6173
0, true);
6174
ceph_monc_renew_subs(&fsc->client->monc);
6175
} else {
6176
err = -ENOENT;
6177
goto err_out;
6178
}
6179
return;
6180
6181
bad:
6182
pr_err_client(cl, "error decoding fsmap %d. Shutting down mount.\n",
6183
err);
6184
ceph_umount_begin(mdsc->fsc->sb);
6185
ceph_msg_dump(msg);
6186
err_out:
6187
mutex_lock(&mdsc->mutex);
6188
mdsc->mdsmap_err = err;
6189
__wake_requests(mdsc, &mdsc->waiting_for_map);
6190
mutex_unlock(&mdsc->mutex);
6191
}
6192
6193
/*
6194
* handle mds map update.
6195
*/
6196
void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
6197
{
6198
struct ceph_client *cl = mdsc->fsc->client;
6199
u32 epoch;
6200
u32 maplen;
6201
void *p = msg->front.iov_base;
6202
void *end = p + msg->front.iov_len;
6203
struct ceph_mdsmap *newmap, *oldmap;
6204
struct ceph_fsid fsid;
6205
int err = -EINVAL;
6206
6207
ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
6208
ceph_decode_copy(&p, &fsid, sizeof(fsid));
6209
if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
6210
return;
6211
epoch = ceph_decode_32(&p);
6212
maplen = ceph_decode_32(&p);
6213
doutc(cl, "epoch %u len %d\n", epoch, (int)maplen);
6214
6215
/* do we need it? */
6216
mutex_lock(&mdsc->mutex);
6217
if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
6218
doutc(cl, "epoch %u <= our %u\n", epoch, mdsc->mdsmap->m_epoch);
6219
mutex_unlock(&mdsc->mutex);
6220
return;
6221
}
6222
6223
newmap = ceph_mdsmap_decode(mdsc, &p, end, ceph_msgr2(mdsc->fsc->client));
6224
if (IS_ERR(newmap)) {
6225
err = PTR_ERR(newmap);
6226
goto bad_unlock;
6227
}
6228
6229
/* swap into place */
6230
if (mdsc->mdsmap) {
6231
oldmap = mdsc->mdsmap;
6232
mdsc->mdsmap = newmap;
6233
check_new_map(mdsc, newmap, oldmap);
6234
ceph_mdsmap_destroy(oldmap);
6235
} else {
6236
mdsc->mdsmap = newmap; /* first mds map */
6237
}
6238
mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
6239
MAX_LFS_FILESIZE);
6240
6241
__wake_requests(mdsc, &mdsc->waiting_for_map);
6242
ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
6243
mdsc->mdsmap->m_epoch);
6244
6245
mutex_unlock(&mdsc->mutex);
6246
schedule_delayed(mdsc, 0);
6247
return;
6248
6249
bad_unlock:
6250
mutex_unlock(&mdsc->mutex);
6251
bad:
6252
pr_err_client(cl, "error decoding mdsmap %d. Shutting down mount.\n",
6253
err);
6254
ceph_umount_begin(mdsc->fsc->sb);
6255
ceph_msg_dump(msg);
6256
return;
6257
}
6258
6259
static struct ceph_connection *mds_get_con(struct ceph_connection *con)
6260
{
6261
struct ceph_mds_session *s = con->private;
6262
6263
if (ceph_get_mds_session(s))
6264
return con;
6265
return NULL;
6266
}
6267
6268
static void mds_put_con(struct ceph_connection *con)
6269
{
6270
struct ceph_mds_session *s = con->private;
6271
6272
ceph_put_mds_session(s);
6273
}
6274
6275
/*
6276
* if the client is unresponsive for long enough, the mds will kill
6277
* the session entirely.
6278
*/
6279
static void mds_peer_reset(struct ceph_connection *con)
6280
{
6281
struct ceph_mds_session *s = con->private;
6282
struct ceph_mds_client *mdsc = s->s_mdsc;
6283
6284
pr_warn_client(mdsc->fsc->client, "mds%d closed our session\n",
6285
s->s_mds);
6286
if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO &&
6287
ceph_mdsmap_get_state(mdsc->mdsmap, s->s_mds) >= CEPH_MDS_STATE_RECONNECT)
6288
send_mds_reconnect(mdsc, s);
6289
}
6290
6291
static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
6292
{
6293
struct ceph_mds_session *s = con->private;
6294
struct ceph_mds_client *mdsc = s->s_mdsc;
6295
struct ceph_client *cl = mdsc->fsc->client;
6296
int type = le16_to_cpu(msg->hdr.type);
6297
6298
mutex_lock(&mdsc->mutex);
6299
if (__verify_registered_session(mdsc, s) < 0) {
6300
mutex_unlock(&mdsc->mutex);
6301
goto out;
6302
}
6303
mutex_unlock(&mdsc->mutex);
6304
6305
switch (type) {
6306
case CEPH_MSG_MDS_MAP:
6307
ceph_mdsc_handle_mdsmap(mdsc, msg);
6308
break;
6309
case CEPH_MSG_FS_MAP_USER:
6310
ceph_mdsc_handle_fsmap(mdsc, msg);
6311
break;
6312
case CEPH_MSG_CLIENT_SESSION:
6313
handle_session(s, msg);
6314
break;
6315
case CEPH_MSG_CLIENT_REPLY:
6316
handle_reply(s, msg);
6317
break;
6318
case CEPH_MSG_CLIENT_REQUEST_FORWARD:
6319
handle_forward(mdsc, s, msg);
6320
break;
6321
case CEPH_MSG_CLIENT_CAPS:
6322
ceph_handle_caps(s, msg);
6323
break;
6324
case CEPH_MSG_CLIENT_SNAP:
6325
ceph_handle_snap(mdsc, s, msg);
6326
break;
6327
case CEPH_MSG_CLIENT_LEASE:
6328
handle_lease(mdsc, s, msg);
6329
break;
6330
case CEPH_MSG_CLIENT_QUOTA:
6331
ceph_handle_quota(mdsc, s, msg);
6332
break;
6333
6334
default:
6335
pr_err_client(cl, "received unknown message type %d %s\n",
6336
type, ceph_msg_type_name(type));
6337
}
6338
out:
6339
ceph_msg_put(msg);
6340
}
6341
6342
/*
6343
* authentication
6344
*/
6345
6346
/*
6347
* Note: returned pointer is the address of a structure that's
6348
* managed separately. Caller must *not* attempt to free it.
6349
*/
6350
static struct ceph_auth_handshake *
6351
mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
6352
{
6353
struct ceph_mds_session *s = con->private;
6354
struct ceph_mds_client *mdsc = s->s_mdsc;
6355
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6356
struct ceph_auth_handshake *auth = &s->s_auth;
6357
int ret;
6358
6359
ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
6360
force_new, proto, NULL, NULL);
6361
if (ret)
6362
return ERR_PTR(ret);
6363
6364
return auth;
6365
}
6366
6367
static int mds_add_authorizer_challenge(struct ceph_connection *con,
6368
void *challenge_buf, int challenge_buf_len)
6369
{
6370
struct ceph_mds_session *s = con->private;
6371
struct ceph_mds_client *mdsc = s->s_mdsc;
6372
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6373
6374
return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
6375
challenge_buf, challenge_buf_len);
6376
}
6377
6378
static int mds_verify_authorizer_reply(struct ceph_connection *con)
6379
{
6380
struct ceph_mds_session *s = con->private;
6381
struct ceph_mds_client *mdsc = s->s_mdsc;
6382
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6383
struct ceph_auth_handshake *auth = &s->s_auth;
6384
6385
return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
6386
auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
6387
NULL, NULL, NULL, NULL);
6388
}
6389
6390
static int mds_invalidate_authorizer(struct ceph_connection *con)
6391
{
6392
struct ceph_mds_session *s = con->private;
6393
struct ceph_mds_client *mdsc = s->s_mdsc;
6394
struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6395
6396
ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
6397
6398
return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
6399
}
6400
6401
static int mds_get_auth_request(struct ceph_connection *con,
6402
void *buf, int *buf_len,
6403
void **authorizer, int *authorizer_len)
6404
{
6405
struct ceph_mds_session *s = con->private;
6406
struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
6407
struct ceph_auth_handshake *auth = &s->s_auth;
6408
int ret;
6409
6410
ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
6411
buf, buf_len);
6412
if (ret)
6413
return ret;
6414
6415
*authorizer = auth->authorizer_buf;
6416
*authorizer_len = auth->authorizer_buf_len;
6417
return 0;
6418
}
6419
6420
static int mds_handle_auth_reply_more(struct ceph_connection *con,
6421
void *reply, int reply_len,
6422
void *buf, int *buf_len,
6423
void **authorizer, int *authorizer_len)
6424
{
6425
struct ceph_mds_session *s = con->private;
6426
struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
6427
struct ceph_auth_handshake *auth = &s->s_auth;
6428
int ret;
6429
6430
ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
6431
buf, buf_len);
6432
if (ret)
6433
return ret;
6434
6435
*authorizer = auth->authorizer_buf;
6436
*authorizer_len = auth->authorizer_buf_len;
6437
return 0;
6438
}
6439
6440
static int mds_handle_auth_done(struct ceph_connection *con,
6441
u64 global_id, void *reply, int reply_len,
6442
u8 *session_key, int *session_key_len,
6443
u8 *con_secret, int *con_secret_len)
6444
{
6445
struct ceph_mds_session *s = con->private;
6446
struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
6447
struct ceph_auth_handshake *auth = &s->s_auth;
6448
6449
return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
6450
session_key, session_key_len,
6451
con_secret, con_secret_len);
6452
}
6453
6454
static int mds_handle_auth_bad_method(struct ceph_connection *con,
6455
int used_proto, int result,
6456
const int *allowed_protos, int proto_cnt,
6457
const int *allowed_modes, int mode_cnt)
6458
{
6459
struct ceph_mds_session *s = con->private;
6460
struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
6461
int ret;
6462
6463
if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
6464
used_proto, result,
6465
allowed_protos, proto_cnt,
6466
allowed_modes, mode_cnt)) {
6467
ret = ceph_monc_validate_auth(monc);
6468
if (ret)
6469
return ret;
6470
}
6471
6472
return -EACCES;
6473
}
6474
6475
static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
6476
struct ceph_msg_header *hdr, int *skip)
6477
{
6478
struct ceph_msg *msg;
6479
int type = (int) le16_to_cpu(hdr->type);
6480
int front_len = (int) le32_to_cpu(hdr->front_len);
6481
6482
if (con->in_msg)
6483
return con->in_msg;
6484
6485
*skip = 0;
6486
msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
6487
if (!msg) {
6488
pr_err("unable to allocate msg type %d len %d\n",
6489
type, front_len);
6490
return NULL;
6491
}
6492
6493
return msg;
6494
}
6495
6496
static int mds_sign_message(struct ceph_msg *msg)
6497
{
6498
struct ceph_mds_session *s = msg->con->private;
6499
struct ceph_auth_handshake *auth = &s->s_auth;
6500
6501
return ceph_auth_sign_message(auth, msg);
6502
}
6503
6504
static int mds_check_message_signature(struct ceph_msg *msg)
6505
{
6506
struct ceph_mds_session *s = msg->con->private;
6507
struct ceph_auth_handshake *auth = &s->s_auth;
6508
6509
return ceph_auth_check_message_signature(auth, msg);
6510
}
6511
6512
static const struct ceph_connection_operations mds_con_ops = {
6513
.get = mds_get_con,
6514
.put = mds_put_con,
6515
.alloc_msg = mds_alloc_msg,
6516
.dispatch = mds_dispatch,
6517
.peer_reset = mds_peer_reset,
6518
.get_authorizer = mds_get_authorizer,
6519
.add_authorizer_challenge = mds_add_authorizer_challenge,
6520
.verify_authorizer_reply = mds_verify_authorizer_reply,
6521
.invalidate_authorizer = mds_invalidate_authorizer,
6522
.sign_message = mds_sign_message,
6523
.check_message_signature = mds_check_message_signature,
6524
.get_auth_request = mds_get_auth_request,
6525
.handle_auth_reply_more = mds_handle_auth_reply_more,
6526
.handle_auth_done = mds_handle_auth_done,
6527
.handle_auth_bad_method = mds_handle_auth_bad_method,
6528
};
6529
6530
/* eof */
6531
6532