Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/ceph/dir.c
26281 views
1
// SPDX-License-Identifier: GPL-2.0
2
#include <linux/ceph/ceph_debug.h>
3
4
#include <linux/spinlock.h>
5
#include <linux/namei.h>
6
#include <linux/slab.h>
7
#include <linux/sched.h>
8
#include <linux/xattr.h>
9
10
#include "super.h"
11
#include "mds_client.h"
12
#include "crypto.h"
13
14
/*
15
* Directory operations: readdir, lookup, create, link, unlink,
16
* rename, etc.
17
*/
18
19
/*
20
* Ceph MDS operations are specified in terms of a base ino and
21
* relative path. Thus, the client can specify an operation on a
22
* specific inode (e.g., a getattr due to fstat(2)), or as a path
23
* relative to, say, the root directory.
24
*
25
* Normally, we limit ourselves to strict inode ops (no path component)
26
* or dentry operations (a single path component relative to an ino). The
27
* exception to this is open_root_dentry(), which will open the mount
28
* point by name.
29
*/
30
31
const struct dentry_operations ceph_dentry_ops;
32
33
static bool __dentry_lease_is_valid(struct ceph_dentry_info *di);
34
static int __dir_lease_try_check(const struct dentry *dentry);
35
36
/*
37
* Initialize ceph dentry state.
38
*/
39
static int ceph_d_init(struct dentry *dentry)
40
{
41
struct ceph_dentry_info *di;
42
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dentry->d_sb);
43
44
di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
45
if (!di)
46
return -ENOMEM; /* oh well */
47
48
di->dentry = dentry;
49
di->lease_session = NULL;
50
di->time = jiffies;
51
dentry->d_fsdata = di;
52
INIT_LIST_HEAD(&di->lease_list);
53
54
atomic64_inc(&mdsc->metric.total_dentries);
55
56
return 0;
57
}
58
59
/*
60
* for f_pos for readdir:
61
* - hash order:
62
* (0xff << 52) | ((24 bits hash) << 28) |
63
* (the nth entry has hash collision);
64
* - frag+name order;
65
* ((frag value) << 28) | (the nth entry in frag);
66
*/
67
#define OFFSET_BITS 28
68
#define OFFSET_MASK ((1 << OFFSET_BITS) - 1)
69
#define HASH_ORDER (0xffull << (OFFSET_BITS + 24))
70
loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order)
71
{
72
loff_t fpos = ((loff_t)high << 28) | (loff_t)off;
73
if (hash_order)
74
fpos |= HASH_ORDER;
75
return fpos;
76
}
77
78
static bool is_hash_order(loff_t p)
79
{
80
return (p & HASH_ORDER) == HASH_ORDER;
81
}
82
83
static unsigned fpos_frag(loff_t p)
84
{
85
return p >> OFFSET_BITS;
86
}
87
88
static unsigned fpos_hash(loff_t p)
89
{
90
return ceph_frag_value(fpos_frag(p));
91
}
92
93
static unsigned fpos_off(loff_t p)
94
{
95
return p & OFFSET_MASK;
96
}
97
98
static int fpos_cmp(loff_t l, loff_t r)
99
{
100
int v = ceph_frag_compare(fpos_frag(l), fpos_frag(r));
101
if (v)
102
return v;
103
return (int)(fpos_off(l) - fpos_off(r));
104
}
105
106
/*
107
* make note of the last dentry we read, so we can
108
* continue at the same lexicographical point,
109
* regardless of what dir changes take place on the
110
* server.
111
*/
112
static int note_last_dentry(struct ceph_fs_client *fsc,
113
struct ceph_dir_file_info *dfi,
114
const char *name,
115
int len, unsigned next_offset)
116
{
117
char *buf = kmalloc(len+1, GFP_KERNEL);
118
if (!buf)
119
return -ENOMEM;
120
kfree(dfi->last_name);
121
dfi->last_name = buf;
122
memcpy(dfi->last_name, name, len);
123
dfi->last_name[len] = 0;
124
dfi->next_offset = next_offset;
125
doutc(fsc->client, "'%s'\n", dfi->last_name);
126
return 0;
127
}
128
129
130
static struct dentry *
131
__dcache_find_get_entry(struct dentry *parent, u64 idx,
132
struct ceph_readdir_cache_control *cache_ctl)
133
{
134
struct inode *dir = d_inode(parent);
135
struct ceph_client *cl = ceph_inode_to_client(dir);
136
struct dentry *dentry;
137
unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1;
138
loff_t ptr_pos = idx * sizeof(struct dentry *);
139
pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT;
140
141
if (ptr_pos >= i_size_read(dir))
142
return NULL;
143
144
if (!cache_ctl->folio || ptr_pgoff != cache_ctl->folio->index) {
145
ceph_readdir_cache_release(cache_ctl);
146
cache_ctl->folio = filemap_lock_folio(&dir->i_data, ptr_pgoff);
147
if (IS_ERR(cache_ctl->folio)) {
148
cache_ctl->folio = NULL;
149
doutc(cl, " folio %lu not found\n", ptr_pgoff);
150
return ERR_PTR(-EAGAIN);
151
}
152
/* reading/filling the cache are serialized by
153
i_rwsem, no need to use folio lock */
154
folio_unlock(cache_ctl->folio);
155
cache_ctl->dentries = kmap_local_folio(cache_ctl->folio, 0);
156
}
157
158
cache_ctl->index = idx & idx_mask;
159
160
rcu_read_lock();
161
spin_lock(&parent->d_lock);
162
/* check i_size again here, because empty directory can be
163
* marked as complete while not holding the i_rwsem. */
164
if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir))
165
dentry = cache_ctl->dentries[cache_ctl->index];
166
else
167
dentry = NULL;
168
spin_unlock(&parent->d_lock);
169
if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
170
dentry = NULL;
171
rcu_read_unlock();
172
return dentry ? : ERR_PTR(-EAGAIN);
173
}
174
175
/*
176
* When possible, we try to satisfy a readdir by peeking at the
177
* dcache. We make this work by carefully ordering dentries on
178
* d_children when we initially get results back from the MDS, and
179
* falling back to a "normal" sync readdir if any dentries in the dir
180
* are dropped.
181
*
182
* Complete dir indicates that we have all dentries in the dir. It is
183
* defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by
184
* the MDS if/when the directory is modified).
185
*/
186
static int __dcache_readdir(struct file *file, struct dir_context *ctx,
187
int shared_gen)
188
{
189
struct ceph_dir_file_info *dfi = file->private_data;
190
struct dentry *parent = file->f_path.dentry;
191
struct inode *dir = d_inode(parent);
192
struct ceph_fs_client *fsc = ceph_inode_to_fs_client(dir);
193
struct ceph_client *cl = ceph_inode_to_client(dir);
194
struct dentry *dentry, *last = NULL;
195
struct ceph_dentry_info *di;
196
struct ceph_readdir_cache_control cache_ctl = {};
197
u64 idx = 0;
198
int err = 0;
199
200
doutc(cl, "%p %llx.%llx v%u at %llx\n", dir, ceph_vinop(dir),
201
(unsigned)shared_gen, ctx->pos);
202
203
/* search start position */
204
if (ctx->pos > 2) {
205
u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *));
206
while (count > 0) {
207
u64 step = count >> 1;
208
dentry = __dcache_find_get_entry(parent, idx + step,
209
&cache_ctl);
210
if (!dentry) {
211
/* use linear search */
212
idx = 0;
213
break;
214
}
215
if (IS_ERR(dentry)) {
216
err = PTR_ERR(dentry);
217
goto out;
218
}
219
di = ceph_dentry(dentry);
220
spin_lock(&dentry->d_lock);
221
if (fpos_cmp(di->offset, ctx->pos) < 0) {
222
idx += step + 1;
223
count -= step + 1;
224
} else {
225
count = step;
226
}
227
spin_unlock(&dentry->d_lock);
228
dput(dentry);
229
}
230
231
doutc(cl, "%p %llx.%llx cache idx %llu\n", dir,
232
ceph_vinop(dir), idx);
233
}
234
235
236
for (;;) {
237
bool emit_dentry = false;
238
dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl);
239
if (!dentry) {
240
dfi->file_info.flags |= CEPH_F_ATEND;
241
err = 0;
242
break;
243
}
244
if (IS_ERR(dentry)) {
245
err = PTR_ERR(dentry);
246
goto out;
247
}
248
249
spin_lock(&dentry->d_lock);
250
di = ceph_dentry(dentry);
251
if (d_unhashed(dentry) ||
252
d_really_is_negative(dentry) ||
253
di->lease_shared_gen != shared_gen ||
254
((dentry->d_flags & DCACHE_NOKEY_NAME) &&
255
fscrypt_has_encryption_key(dir))) {
256
spin_unlock(&dentry->d_lock);
257
dput(dentry);
258
err = -EAGAIN;
259
goto out;
260
}
261
if (fpos_cmp(ctx->pos, di->offset) <= 0) {
262
__ceph_dentry_dir_lease_touch(di);
263
emit_dentry = true;
264
}
265
spin_unlock(&dentry->d_lock);
266
267
if (emit_dentry) {
268
doutc(cl, " %llx dentry %p %pd %p\n", di->offset,
269
dentry, dentry, d_inode(dentry));
270
ctx->pos = di->offset;
271
if (!dir_emit(ctx, dentry->d_name.name,
272
dentry->d_name.len, ceph_present_inode(d_inode(dentry)),
273
d_inode(dentry)->i_mode >> 12)) {
274
dput(dentry);
275
err = 0;
276
break;
277
}
278
ctx->pos++;
279
280
if (last)
281
dput(last);
282
last = dentry;
283
} else {
284
dput(dentry);
285
}
286
}
287
out:
288
ceph_readdir_cache_release(&cache_ctl);
289
if (last) {
290
int ret;
291
di = ceph_dentry(last);
292
ret = note_last_dentry(fsc, dfi, last->d_name.name,
293
last->d_name.len,
294
fpos_off(di->offset) + 1);
295
if (ret < 0)
296
err = ret;
297
dput(last);
298
/* last_name no longer match cache index */
299
if (dfi->readdir_cache_idx >= 0) {
300
dfi->readdir_cache_idx = -1;
301
dfi->dir_release_count = 0;
302
}
303
}
304
return err;
305
}
306
307
static bool need_send_readdir(struct ceph_dir_file_info *dfi, loff_t pos)
308
{
309
if (!dfi->last_readdir)
310
return true;
311
if (is_hash_order(pos))
312
return !ceph_frag_contains_value(dfi->frag, fpos_hash(pos));
313
else
314
return dfi->frag != fpos_frag(pos);
315
}
316
317
static int ceph_readdir(struct file *file, struct dir_context *ctx)
318
{
319
struct ceph_dir_file_info *dfi = file->private_data;
320
struct inode *inode = file_inode(file);
321
struct ceph_inode_info *ci = ceph_inode(inode);
322
struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
323
struct ceph_mds_client *mdsc = fsc->mdsc;
324
struct ceph_client *cl = fsc->client;
325
int i;
326
int err;
327
unsigned frag = -1;
328
struct ceph_mds_reply_info_parsed *rinfo;
329
330
doutc(cl, "%p %llx.%llx file %p pos %llx\n", inode,
331
ceph_vinop(inode), file, ctx->pos);
332
if (dfi->file_info.flags & CEPH_F_ATEND)
333
return 0;
334
335
/* always start with . and .. */
336
if (ctx->pos == 0) {
337
doutc(cl, "%p %llx.%llx off 0 -> '.'\n", inode,
338
ceph_vinop(inode));
339
if (!dir_emit(ctx, ".", 1, ceph_present_inode(inode),
340
inode->i_mode >> 12))
341
return 0;
342
ctx->pos = 1;
343
}
344
if (ctx->pos == 1) {
345
u64 ino;
346
struct dentry *dentry = file->f_path.dentry;
347
348
spin_lock(&dentry->d_lock);
349
ino = ceph_present_inode(dentry->d_parent->d_inode);
350
spin_unlock(&dentry->d_lock);
351
352
doutc(cl, "%p %llx.%llx off 1 -> '..'\n", inode,
353
ceph_vinop(inode));
354
if (!dir_emit(ctx, "..", 2, ino, inode->i_mode >> 12))
355
return 0;
356
ctx->pos = 2;
357
}
358
359
err = ceph_fscrypt_prepare_readdir(inode);
360
if (err < 0)
361
return err;
362
363
spin_lock(&ci->i_ceph_lock);
364
/* request Fx cap. if have Fx, we don't need to release Fs cap
365
* for later create/unlink. */
366
__ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_WR);
367
/* can we use the dcache? */
368
if (ceph_test_mount_opt(fsc, DCACHE) &&
369
!ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
370
ceph_snap(inode) != CEPH_SNAPDIR &&
371
__ceph_dir_is_complete_ordered(ci) &&
372
__ceph_caps_issued_mask_metric(ci, CEPH_CAP_FILE_SHARED, 1)) {
373
int shared_gen = atomic_read(&ci->i_shared_gen);
374
375
spin_unlock(&ci->i_ceph_lock);
376
err = __dcache_readdir(file, ctx, shared_gen);
377
if (err != -EAGAIN)
378
return err;
379
} else {
380
spin_unlock(&ci->i_ceph_lock);
381
}
382
383
/* proceed with a normal readdir */
384
more:
385
/* do we have the correct frag content buffered? */
386
if (need_send_readdir(dfi, ctx->pos)) {
387
struct ceph_mds_request *req;
388
int op = ceph_snap(inode) == CEPH_SNAPDIR ?
389
CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
390
391
/* discard old result, if any */
392
if (dfi->last_readdir) {
393
ceph_mdsc_put_request(dfi->last_readdir);
394
dfi->last_readdir = NULL;
395
}
396
397
if (is_hash_order(ctx->pos)) {
398
/* fragtree isn't always accurate. choose frag
399
* based on previous reply when possible. */
400
if (frag == (unsigned)-1)
401
frag = ceph_choose_frag(ci, fpos_hash(ctx->pos),
402
NULL, NULL);
403
} else {
404
frag = fpos_frag(ctx->pos);
405
}
406
407
doutc(cl, "fetching %p %llx.%llx frag %x offset '%s'\n",
408
inode, ceph_vinop(inode), frag, dfi->last_name);
409
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
410
if (IS_ERR(req))
411
return PTR_ERR(req);
412
413
err = ceph_alloc_readdir_reply_buffer(req, inode);
414
if (err) {
415
ceph_mdsc_put_request(req);
416
return err;
417
}
418
/* hints to request -> mds selection code */
419
req->r_direct_mode = USE_AUTH_MDS;
420
if (op == CEPH_MDS_OP_READDIR) {
421
req->r_direct_hash = ceph_frag_value(frag);
422
__set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
423
req->r_inode_drop = CEPH_CAP_FILE_EXCL;
424
}
425
if (dfi->last_name) {
426
int len = strlen(dfi->last_name);
427
428
req->r_path2 = kzalloc(NAME_MAX + 1, GFP_KERNEL);
429
if (!req->r_path2) {
430
ceph_mdsc_put_request(req);
431
return -ENOMEM;
432
}
433
memcpy(req->r_path2, dfi->last_name, len);
434
435
err = ceph_encode_encrypted_dname(inode, req->r_path2, len);
436
if (err < 0) {
437
ceph_mdsc_put_request(req);
438
return err;
439
}
440
} else if (is_hash_order(ctx->pos)) {
441
req->r_args.readdir.offset_hash =
442
cpu_to_le32(fpos_hash(ctx->pos));
443
}
444
445
req->r_dir_release_cnt = dfi->dir_release_count;
446
req->r_dir_ordered_cnt = dfi->dir_ordered_count;
447
req->r_readdir_cache_idx = dfi->readdir_cache_idx;
448
req->r_readdir_offset = dfi->next_offset;
449
req->r_args.readdir.frag = cpu_to_le32(frag);
450
req->r_args.readdir.flags =
451
cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS);
452
453
req->r_inode = inode;
454
ihold(inode);
455
req->r_dentry = dget(file->f_path.dentry);
456
err = ceph_mdsc_do_request(mdsc, NULL, req);
457
if (err < 0) {
458
ceph_mdsc_put_request(req);
459
return err;
460
}
461
doutc(cl, "%p %llx.%llx got and parsed readdir result=%d"
462
"on frag %x, end=%d, complete=%d, hash_order=%d\n",
463
inode, ceph_vinop(inode), err, frag,
464
(int)req->r_reply_info.dir_end,
465
(int)req->r_reply_info.dir_complete,
466
(int)req->r_reply_info.hash_order);
467
468
rinfo = &req->r_reply_info;
469
if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
470
frag = le32_to_cpu(rinfo->dir_dir->frag);
471
if (!rinfo->hash_order) {
472
dfi->next_offset = req->r_readdir_offset;
473
/* adjust ctx->pos to beginning of frag */
474
ctx->pos = ceph_make_fpos(frag,
475
dfi->next_offset,
476
false);
477
}
478
}
479
480
dfi->frag = frag;
481
dfi->last_readdir = req;
482
483
if (test_bit(CEPH_MDS_R_DID_PREPOPULATE, &req->r_req_flags)) {
484
dfi->readdir_cache_idx = req->r_readdir_cache_idx;
485
if (dfi->readdir_cache_idx < 0) {
486
/* preclude from marking dir ordered */
487
dfi->dir_ordered_count = 0;
488
} else if (ceph_frag_is_leftmost(frag) &&
489
dfi->next_offset == 2) {
490
/* note dir version at start of readdir so
491
* we can tell if any dentries get dropped */
492
dfi->dir_release_count = req->r_dir_release_cnt;
493
dfi->dir_ordered_count = req->r_dir_ordered_cnt;
494
}
495
} else {
496
doutc(cl, "%p %llx.%llx !did_prepopulate\n", inode,
497
ceph_vinop(inode));
498
/* disable readdir cache */
499
dfi->readdir_cache_idx = -1;
500
/* preclude from marking dir complete */
501
dfi->dir_release_count = 0;
502
}
503
504
/* note next offset and last dentry name */
505
if (rinfo->dir_nr > 0) {
506
struct ceph_mds_reply_dir_entry *rde =
507
rinfo->dir_entries + (rinfo->dir_nr-1);
508
unsigned next_offset = req->r_reply_info.dir_end ?
509
2 : (fpos_off(rde->offset) + 1);
510
err = note_last_dentry(fsc, dfi, rde->name,
511
rde->name_len, next_offset);
512
if (err) {
513
ceph_mdsc_put_request(dfi->last_readdir);
514
dfi->last_readdir = NULL;
515
return err;
516
}
517
} else if (req->r_reply_info.dir_end) {
518
dfi->next_offset = 2;
519
/* keep last name */
520
}
521
}
522
523
rinfo = &dfi->last_readdir->r_reply_info;
524
doutc(cl, "%p %llx.%llx frag %x num %d pos %llx chunk first %llx\n",
525
inode, ceph_vinop(inode), dfi->frag, rinfo->dir_nr, ctx->pos,
526
rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL);
527
528
i = 0;
529
/* search start position */
530
if (rinfo->dir_nr > 0) {
531
int step, nr = rinfo->dir_nr;
532
while (nr > 0) {
533
step = nr >> 1;
534
if (rinfo->dir_entries[i + step].offset < ctx->pos) {
535
i += step + 1;
536
nr -= step + 1;
537
} else {
538
nr = step;
539
}
540
}
541
}
542
for (; i < rinfo->dir_nr; i++) {
543
struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
544
545
if (rde->offset < ctx->pos) {
546
pr_warn_client(cl,
547
"%p %llx.%llx rde->offset 0x%llx ctx->pos 0x%llx\n",
548
inode, ceph_vinop(inode), rde->offset, ctx->pos);
549
return -EIO;
550
}
551
552
if (WARN_ON_ONCE(!rde->inode.in))
553
return -EIO;
554
555
ctx->pos = rde->offset;
556
doutc(cl, "%p %llx.%llx (%d/%d) -> %llx '%.*s' %p\n", inode,
557
ceph_vinop(inode), i, rinfo->dir_nr, ctx->pos,
558
rde->name_len, rde->name, &rde->inode.in);
559
560
if (!dir_emit(ctx, rde->name, rde->name_len,
561
ceph_present_ino(inode->i_sb, le64_to_cpu(rde->inode.in->ino)),
562
le32_to_cpu(rde->inode.in->mode) >> 12)) {
563
/*
564
* NOTE: Here no need to put the 'dfi->last_readdir',
565
* because when dir_emit stops us it's most likely
566
* doesn't have enough memory, etc. So for next readdir
567
* it will continue.
568
*/
569
doutc(cl, "filldir stopping us...\n");
570
return 0;
571
}
572
573
/* Reset the lengths to their original allocated vals */
574
ctx->pos++;
575
}
576
577
ceph_mdsc_put_request(dfi->last_readdir);
578
dfi->last_readdir = NULL;
579
580
if (dfi->next_offset > 2) {
581
frag = dfi->frag;
582
goto more;
583
}
584
585
/* more frags? */
586
if (!ceph_frag_is_rightmost(dfi->frag)) {
587
frag = ceph_frag_next(dfi->frag);
588
if (is_hash_order(ctx->pos)) {
589
loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
590
dfi->next_offset, true);
591
if (new_pos > ctx->pos)
592
ctx->pos = new_pos;
593
/* keep last_name */
594
} else {
595
ctx->pos = ceph_make_fpos(frag, dfi->next_offset,
596
false);
597
kfree(dfi->last_name);
598
dfi->last_name = NULL;
599
}
600
doutc(cl, "%p %llx.%llx next frag is %x\n", inode,
601
ceph_vinop(inode), frag);
602
goto more;
603
}
604
dfi->file_info.flags |= CEPH_F_ATEND;
605
606
/*
607
* if dir_release_count still matches the dir, no dentries
608
* were released during the whole readdir, and we should have
609
* the complete dir contents in our cache.
610
*/
611
if (atomic64_read(&ci->i_release_count) ==
612
dfi->dir_release_count) {
613
spin_lock(&ci->i_ceph_lock);
614
if (dfi->dir_ordered_count ==
615
atomic64_read(&ci->i_ordered_count)) {
616
doutc(cl, " marking %p %llx.%llx complete and ordered\n",
617
inode, ceph_vinop(inode));
618
/* use i_size to track number of entries in
619
* readdir cache */
620
BUG_ON(dfi->readdir_cache_idx < 0);
621
i_size_write(inode, dfi->readdir_cache_idx *
622
sizeof(struct dentry*));
623
} else {
624
doutc(cl, " marking %llx.%llx complete\n",
625
ceph_vinop(inode));
626
}
627
__ceph_dir_set_complete(ci, dfi->dir_release_count,
628
dfi->dir_ordered_count);
629
spin_unlock(&ci->i_ceph_lock);
630
}
631
doutc(cl, "%p %llx.%llx file %p done.\n", inode, ceph_vinop(inode),
632
file);
633
return 0;
634
}
635
636
static void reset_readdir(struct ceph_dir_file_info *dfi)
637
{
638
if (dfi->last_readdir) {
639
ceph_mdsc_put_request(dfi->last_readdir);
640
dfi->last_readdir = NULL;
641
}
642
kfree(dfi->last_name);
643
dfi->last_name = NULL;
644
dfi->dir_release_count = 0;
645
dfi->readdir_cache_idx = -1;
646
dfi->next_offset = 2; /* compensate for . and .. */
647
dfi->file_info.flags &= ~CEPH_F_ATEND;
648
}
649
650
/*
651
* discard buffered readdir content on seekdir(0), or seek to new frag,
652
* or seek prior to current chunk
653
*/
654
static bool need_reset_readdir(struct ceph_dir_file_info *dfi, loff_t new_pos)
655
{
656
struct ceph_mds_reply_info_parsed *rinfo;
657
loff_t chunk_offset;
658
if (new_pos == 0)
659
return true;
660
if (is_hash_order(new_pos)) {
661
/* no need to reset last_name for a forward seek when
662
* dentries are sorted in hash order */
663
} else if (dfi->frag != fpos_frag(new_pos)) {
664
return true;
665
}
666
rinfo = dfi->last_readdir ? &dfi->last_readdir->r_reply_info : NULL;
667
if (!rinfo || !rinfo->dir_nr)
668
return true;
669
chunk_offset = rinfo->dir_entries[0].offset;
670
return new_pos < chunk_offset ||
671
is_hash_order(new_pos) != is_hash_order(chunk_offset);
672
}
673
674
static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
675
{
676
struct ceph_dir_file_info *dfi = file->private_data;
677
struct inode *inode = file->f_mapping->host;
678
struct ceph_client *cl = ceph_inode_to_client(inode);
679
loff_t retval;
680
681
inode_lock(inode);
682
retval = -EINVAL;
683
switch (whence) {
684
case SEEK_CUR:
685
offset += file->f_pos;
686
break;
687
case SEEK_SET:
688
break;
689
case SEEK_END:
690
retval = -EOPNOTSUPP;
691
goto out;
692
default:
693
goto out;
694
}
695
696
if (offset >= 0) {
697
if (need_reset_readdir(dfi, offset)) {
698
doutc(cl, "%p %llx.%llx dropping %p content\n",
699
inode, ceph_vinop(inode), file);
700
reset_readdir(dfi);
701
} else if (is_hash_order(offset) && offset > file->f_pos) {
702
/* for hash offset, we don't know if a forward seek
703
* is within same frag */
704
dfi->dir_release_count = 0;
705
dfi->readdir_cache_idx = -1;
706
}
707
708
if (offset != file->f_pos) {
709
file->f_pos = offset;
710
dfi->file_info.flags &= ~CEPH_F_ATEND;
711
}
712
retval = offset;
713
}
714
out:
715
inode_unlock(inode);
716
return retval;
717
}
718
719
/*
720
* Handle lookups for the hidden .snap directory.
721
*/
722
struct dentry *ceph_handle_snapdir(struct ceph_mds_request *req,
723
struct dentry *dentry)
724
{
725
struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb);
726
struct inode *parent = d_inode(dentry->d_parent); /* we hold i_rwsem */
727
struct ceph_client *cl = ceph_inode_to_client(parent);
728
729
/* .snap dir? */
730
if (ceph_snap(parent) == CEPH_NOSNAP &&
731
strcmp(dentry->d_name.name, fsc->mount_options->snapdir_name) == 0) {
732
struct dentry *res;
733
struct inode *inode = ceph_get_snapdir(parent);
734
735
res = d_splice_alias(inode, dentry);
736
doutc(cl, "ENOENT on snapdir %p '%pd', linking to "
737
"snapdir %p %llx.%llx. Spliced dentry %p\n",
738
dentry, dentry, inode, ceph_vinop(inode), res);
739
if (res)
740
dentry = res;
741
}
742
return dentry;
743
}
744
745
/*
746
* Figure out final result of a lookup/open request.
747
*
748
* Mainly, make sure we return the final req->r_dentry (if it already
749
* existed) in place of the original VFS-provided dentry when they
750
* differ.
751
*
752
* Gracefully handle the case where the MDS replies with -ENOENT and
753
* no trace (which it may do, at its discretion, e.g., if it doesn't
754
* care to issue a lease on the negative dentry).
755
*/
756
struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
757
struct dentry *dentry, int err)
758
{
759
struct ceph_client *cl = req->r_mdsc->fsc->client;
760
761
if (err == -ENOENT) {
762
/* no trace? */
763
err = 0;
764
if (!req->r_reply_info.head->is_dentry) {
765
doutc(cl,
766
"ENOENT and no trace, dentry %p inode %llx.%llx\n",
767
dentry, ceph_vinop(d_inode(dentry)));
768
if (d_really_is_positive(dentry)) {
769
d_drop(dentry);
770
err = -ENOENT;
771
} else {
772
d_add(dentry, NULL);
773
}
774
}
775
}
776
if (err)
777
dentry = ERR_PTR(err);
778
else if (dentry != req->r_dentry)
779
dentry = dget(req->r_dentry); /* we got spliced */
780
else
781
dentry = NULL;
782
return dentry;
783
}
784
785
static bool is_root_ceph_dentry(struct inode *inode, struct dentry *dentry)
786
{
787
return ceph_ino(inode) == CEPH_INO_ROOT &&
788
strncmp(dentry->d_name.name, ".ceph", 5) == 0;
789
}
790
791
/*
792
* Look up a single dir entry. If there is a lookup intent, inform
793
* the MDS so that it gets our 'caps wanted' value in a single op.
794
*/
795
static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
796
unsigned int flags)
797
{
798
struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dir->i_sb);
799
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
800
struct ceph_client *cl = fsc->client;
801
struct ceph_mds_request *req;
802
int op;
803
int mask;
804
int err;
805
806
doutc(cl, "%p %llx.%llx/'%pd' dentry %p\n", dir, ceph_vinop(dir),
807
dentry, dentry);
808
809
if (dentry->d_name.len > NAME_MAX)
810
return ERR_PTR(-ENAMETOOLONG);
811
812
if (IS_ENCRYPTED(dir)) {
813
bool had_key = fscrypt_has_encryption_key(dir);
814
815
err = fscrypt_prepare_lookup_partial(dir, dentry);
816
if (err < 0)
817
return ERR_PTR(err);
818
819
/* mark directory as incomplete if it has been unlocked */
820
if (!had_key && fscrypt_has_encryption_key(dir))
821
ceph_dir_clear_complete(dir);
822
}
823
824
/* can we conclude ENOENT locally? */
825
if (d_really_is_negative(dentry)) {
826
struct ceph_inode_info *ci = ceph_inode(dir);
827
struct ceph_dentry_info *di = ceph_dentry(dentry);
828
829
spin_lock(&ci->i_ceph_lock);
830
doutc(cl, " dir %llx.%llx flags are 0x%lx\n",
831
ceph_vinop(dir), ci->i_ceph_flags);
832
if (strncmp(dentry->d_name.name,
833
fsc->mount_options->snapdir_name,
834
dentry->d_name.len) &&
835
!is_root_ceph_dentry(dir, dentry) &&
836
ceph_test_mount_opt(fsc, DCACHE) &&
837
__ceph_dir_is_complete(ci) &&
838
__ceph_caps_issued_mask_metric(ci, CEPH_CAP_FILE_SHARED, 1)) {
839
__ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD);
840
spin_unlock(&ci->i_ceph_lock);
841
doutc(cl, " dir %llx.%llx complete, -ENOENT\n",
842
ceph_vinop(dir));
843
d_add(dentry, NULL);
844
di->lease_shared_gen = atomic_read(&ci->i_shared_gen);
845
return NULL;
846
}
847
spin_unlock(&ci->i_ceph_lock);
848
}
849
850
op = ceph_snap(dir) == CEPH_SNAPDIR ?
851
CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
852
req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
853
if (IS_ERR(req))
854
return ERR_CAST(req);
855
req->r_dentry = dget(dentry);
856
req->r_num_caps = 2;
857
858
mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
859
if (ceph_security_xattr_wanted(dir))
860
mask |= CEPH_CAP_XATTR_SHARED;
861
req->r_args.getattr.mask = cpu_to_le32(mask);
862
863
ihold(dir);
864
req->r_parent = dir;
865
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
866
err = ceph_mdsc_do_request(mdsc, NULL, req);
867
if (err == -ENOENT) {
868
struct dentry *res;
869
870
res = ceph_handle_snapdir(req, dentry);
871
if (IS_ERR(res)) {
872
err = PTR_ERR(res);
873
} else {
874
dentry = res;
875
err = 0;
876
}
877
}
878
dentry = ceph_finish_lookup(req, dentry, err);
879
ceph_mdsc_put_request(req); /* will dput(dentry) */
880
doutc(cl, "result=%p\n", dentry);
881
return dentry;
882
}
883
884
/*
885
* If we do a create but get no trace back from the MDS, follow up with
886
* a lookup (the VFS expects us to link up the provided dentry).
887
*/
888
int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry)
889
{
890
struct dentry *result = ceph_lookup(dir, dentry, 0);
891
892
if (result && !IS_ERR(result)) {
893
/*
894
* We created the item, then did a lookup, and found
895
* it was already linked to another inode we already
896
* had in our cache (and thus got spliced). To not
897
* confuse VFS (especially when inode is a directory),
898
* we don't link our dentry to that inode, return an
899
* error instead.
900
*
901
* This event should be rare and it happens only when
902
* we talk to old MDS. Recent MDS does not send traceless
903
* reply for request that creates new inode.
904
*/
905
d_drop(result);
906
return -ESTALE;
907
}
908
return PTR_ERR(result);
909
}
910
911
static int ceph_mknod(struct mnt_idmap *idmap, struct inode *dir,
912
struct dentry *dentry, umode_t mode, dev_t rdev)
913
{
914
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
915
struct ceph_client *cl = mdsc->fsc->client;
916
struct ceph_mds_request *req;
917
struct ceph_acl_sec_ctx as_ctx = {};
918
int err;
919
920
if (ceph_snap(dir) != CEPH_NOSNAP)
921
return -EROFS;
922
923
err = ceph_wait_on_conflict_unlink(dentry);
924
if (err)
925
return err;
926
927
if (ceph_quota_is_max_files_exceeded(dir)) {
928
err = -EDQUOT;
929
goto out;
930
}
931
932
doutc(cl, "%p %llx.%llx/'%pd' dentry %p mode 0%ho rdev %d\n",
933
dir, ceph_vinop(dir), dentry, dentry, mode, rdev);
934
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS);
935
if (IS_ERR(req)) {
936
err = PTR_ERR(req);
937
goto out;
938
}
939
940
req->r_new_inode = ceph_new_inode(dir, dentry, &mode, &as_ctx);
941
if (IS_ERR(req->r_new_inode)) {
942
err = PTR_ERR(req->r_new_inode);
943
req->r_new_inode = NULL;
944
goto out_req;
945
}
946
947
if (S_ISREG(mode) && IS_ENCRYPTED(dir))
948
set_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags);
949
950
req->r_dentry = dget(dentry);
951
req->r_num_caps = 2;
952
req->r_parent = dir;
953
ihold(dir);
954
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
955
req->r_mnt_idmap = mnt_idmap_get(idmap);
956
req->r_args.mknod.mode = cpu_to_le32(mode);
957
req->r_args.mknod.rdev = cpu_to_le32(rdev);
958
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL |
959
CEPH_CAP_XATTR_EXCL;
960
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
961
962
ceph_as_ctx_to_req(req, &as_ctx);
963
964
err = ceph_mdsc_do_request(mdsc, dir, req);
965
if (!err && !req->r_reply_info.head->is_dentry)
966
err = ceph_handle_notrace_create(dir, dentry);
967
out_req:
968
ceph_mdsc_put_request(req);
969
out:
970
if (!err)
971
ceph_init_inode_acls(d_inode(dentry), &as_ctx);
972
else
973
d_drop(dentry);
974
ceph_release_acl_sec_ctx(&as_ctx);
975
return err;
976
}
977
978
static int ceph_create(struct mnt_idmap *idmap, struct inode *dir,
979
struct dentry *dentry, umode_t mode, bool excl)
980
{
981
return ceph_mknod(idmap, dir, dentry, mode, 0);
982
}
983
984
#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
985
static int prep_encrypted_symlink_target(struct ceph_mds_request *req,
986
const char *dest)
987
{
988
int err;
989
int len = strlen(dest);
990
struct fscrypt_str osd_link = FSTR_INIT(NULL, 0);
991
992
err = fscrypt_prepare_symlink(req->r_parent, dest, len, PATH_MAX,
993
&osd_link);
994
if (err)
995
goto out;
996
997
err = fscrypt_encrypt_symlink(req->r_new_inode, dest, len, &osd_link);
998
if (err)
999
goto out;
1000
1001
req->r_path2 = kmalloc(CEPH_BASE64_CHARS(osd_link.len) + 1, GFP_KERNEL);
1002
if (!req->r_path2) {
1003
err = -ENOMEM;
1004
goto out;
1005
}
1006
1007
len = ceph_base64_encode(osd_link.name, osd_link.len, req->r_path2);
1008
req->r_path2[len] = '\0';
1009
out:
1010
fscrypt_fname_free_buffer(&osd_link);
1011
return err;
1012
}
1013
#else
1014
static int prep_encrypted_symlink_target(struct ceph_mds_request *req,
1015
const char *dest)
1016
{
1017
return -EOPNOTSUPP;
1018
}
1019
#endif
1020
1021
static int ceph_symlink(struct mnt_idmap *idmap, struct inode *dir,
1022
struct dentry *dentry, const char *dest)
1023
{
1024
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
1025
struct ceph_client *cl = mdsc->fsc->client;
1026
struct ceph_mds_request *req;
1027
struct ceph_acl_sec_ctx as_ctx = {};
1028
umode_t mode = S_IFLNK | 0777;
1029
int err;
1030
1031
if (ceph_snap(dir) != CEPH_NOSNAP)
1032
return -EROFS;
1033
1034
err = ceph_wait_on_conflict_unlink(dentry);
1035
if (err)
1036
return err;
1037
1038
if (ceph_quota_is_max_files_exceeded(dir)) {
1039
err = -EDQUOT;
1040
goto out;
1041
}
1042
1043
doutc(cl, "%p %llx.%llx/'%pd' to '%s'\n", dir, ceph_vinop(dir), dentry,
1044
dest);
1045
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS);
1046
if (IS_ERR(req)) {
1047
err = PTR_ERR(req);
1048
goto out;
1049
}
1050
1051
req->r_new_inode = ceph_new_inode(dir, dentry, &mode, &as_ctx);
1052
if (IS_ERR(req->r_new_inode)) {
1053
err = PTR_ERR(req->r_new_inode);
1054
req->r_new_inode = NULL;
1055
goto out_req;
1056
}
1057
1058
req->r_parent = dir;
1059
ihold(dir);
1060
1061
if (IS_ENCRYPTED(req->r_new_inode)) {
1062
err = prep_encrypted_symlink_target(req, dest);
1063
if (err)
1064
goto out_req;
1065
} else {
1066
req->r_path2 = kstrdup(dest, GFP_KERNEL);
1067
if (!req->r_path2) {
1068
err = -ENOMEM;
1069
goto out_req;
1070
}
1071
}
1072
1073
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
1074
req->r_mnt_idmap = mnt_idmap_get(idmap);
1075
req->r_dentry = dget(dentry);
1076
req->r_num_caps = 2;
1077
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL |
1078
CEPH_CAP_XATTR_EXCL;
1079
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
1080
1081
ceph_as_ctx_to_req(req, &as_ctx);
1082
1083
err = ceph_mdsc_do_request(mdsc, dir, req);
1084
if (!err && !req->r_reply_info.head->is_dentry)
1085
err = ceph_handle_notrace_create(dir, dentry);
1086
out_req:
1087
ceph_mdsc_put_request(req);
1088
out:
1089
if (err)
1090
d_drop(dentry);
1091
ceph_release_acl_sec_ctx(&as_ctx);
1092
return err;
1093
}
1094
1095
static struct dentry *ceph_mkdir(struct mnt_idmap *idmap, struct inode *dir,
1096
struct dentry *dentry, umode_t mode)
1097
{
1098
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
1099
struct ceph_client *cl = mdsc->fsc->client;
1100
struct ceph_mds_request *req;
1101
struct ceph_acl_sec_ctx as_ctx = {};
1102
struct dentry *ret;
1103
int err;
1104
int op;
1105
1106
err = ceph_wait_on_conflict_unlink(dentry);
1107
if (err)
1108
return ERR_PTR(err);
1109
1110
if (ceph_snap(dir) == CEPH_SNAPDIR) {
1111
/* mkdir .snap/foo is a MKSNAP */
1112
op = CEPH_MDS_OP_MKSNAP;
1113
doutc(cl, "mksnap %llx.%llx/'%pd' dentry %p\n",
1114
ceph_vinop(dir), dentry, dentry);
1115
} else if (ceph_snap(dir) == CEPH_NOSNAP) {
1116
doutc(cl, "mkdir %llx.%llx/'%pd' dentry %p mode 0%ho\n",
1117
ceph_vinop(dir), dentry, dentry, mode);
1118
op = CEPH_MDS_OP_MKDIR;
1119
} else {
1120
ret = ERR_PTR(-EROFS);
1121
goto out;
1122
}
1123
1124
if (op == CEPH_MDS_OP_MKDIR &&
1125
ceph_quota_is_max_files_exceeded(dir)) {
1126
ret = ERR_PTR(-EDQUOT);
1127
goto out;
1128
}
1129
if ((op == CEPH_MDS_OP_MKSNAP) && IS_ENCRYPTED(dir) &&
1130
!fscrypt_has_encryption_key(dir)) {
1131
ret = ERR_PTR(-ENOKEY);
1132
goto out;
1133
}
1134
1135
1136
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
1137
if (IS_ERR(req)) {
1138
ret = ERR_CAST(req);
1139
goto out;
1140
}
1141
1142
mode |= S_IFDIR;
1143
req->r_new_inode = ceph_new_inode(dir, dentry, &mode, &as_ctx);
1144
if (IS_ERR(req->r_new_inode)) {
1145
ret = ERR_CAST(req->r_new_inode);
1146
req->r_new_inode = NULL;
1147
goto out_req;
1148
}
1149
1150
req->r_dentry = dget(dentry);
1151
req->r_num_caps = 2;
1152
req->r_parent = dir;
1153
ihold(dir);
1154
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
1155
if (op == CEPH_MDS_OP_MKDIR)
1156
req->r_mnt_idmap = mnt_idmap_get(idmap);
1157
req->r_args.mkdir.mode = cpu_to_le32(mode);
1158
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL |
1159
CEPH_CAP_XATTR_EXCL;
1160
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
1161
1162
ceph_as_ctx_to_req(req, &as_ctx);
1163
1164
err = ceph_mdsc_do_request(mdsc, dir, req);
1165
if (!err &&
1166
!req->r_reply_info.head->is_target &&
1167
!req->r_reply_info.head->is_dentry)
1168
err = ceph_handle_notrace_create(dir, dentry);
1169
ret = ERR_PTR(err);
1170
out_req:
1171
if (!IS_ERR(ret) && req->r_dentry != dentry)
1172
/* Some other dentry was spliced in */
1173
ret = dget(req->r_dentry);
1174
ceph_mdsc_put_request(req);
1175
out:
1176
if (!IS_ERR(ret)) {
1177
if (ret)
1178
dentry = ret;
1179
ceph_init_inode_acls(d_inode(dentry), &as_ctx);
1180
} else {
1181
d_drop(dentry);
1182
}
1183
ceph_release_acl_sec_ctx(&as_ctx);
1184
return ret;
1185
}
1186
1187
static int ceph_link(struct dentry *old_dentry, struct inode *dir,
1188
struct dentry *dentry)
1189
{
1190
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
1191
struct ceph_client *cl = mdsc->fsc->client;
1192
struct ceph_mds_request *req;
1193
int err;
1194
1195
if (dentry->d_flags & DCACHE_DISCONNECTED)
1196
return -EINVAL;
1197
1198
err = ceph_wait_on_conflict_unlink(dentry);
1199
if (err)
1200
return err;
1201
1202
if (ceph_snap(dir) != CEPH_NOSNAP)
1203
return -EROFS;
1204
1205
err = fscrypt_prepare_link(old_dentry, dir, dentry);
1206
if (err)
1207
return err;
1208
1209
doutc(cl, "%p %llx.%llx/'%pd' to '%pd'\n", dir, ceph_vinop(dir),
1210
old_dentry, dentry);
1211
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS);
1212
if (IS_ERR(req)) {
1213
d_drop(dentry);
1214
return PTR_ERR(req);
1215
}
1216
req->r_dentry = dget(dentry);
1217
req->r_num_caps = 2;
1218
req->r_old_dentry = dget(old_dentry);
1219
/*
1220
* The old_dentry maybe a DCACHE_DISCONNECTED dentry, then we
1221
* will just pass the ino# to MDSs.
1222
*/
1223
if (old_dentry->d_flags & DCACHE_DISCONNECTED)
1224
req->r_ino2 = ceph_vino(d_inode(old_dentry));
1225
req->r_parent = dir;
1226
ihold(dir);
1227
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
1228
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_XATTR_EXCL;
1229
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
1230
/* release LINK_SHARED on source inode (mds will lock it) */
1231
req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
1232
err = ceph_mdsc_do_request(mdsc, dir, req);
1233
if (err) {
1234
d_drop(dentry);
1235
} else if (!req->r_reply_info.head->is_dentry) {
1236
ihold(d_inode(old_dentry));
1237
d_instantiate(dentry, d_inode(old_dentry));
1238
}
1239
ceph_mdsc_put_request(req);
1240
return err;
1241
}
1242
1243
static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
1244
struct ceph_mds_request *req)
1245
{
1246
struct dentry *dentry = req->r_dentry;
1247
struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb);
1248
struct ceph_client *cl = fsc->client;
1249
struct ceph_dentry_info *di = ceph_dentry(dentry);
1250
int result = req->r_err ? req->r_err :
1251
le32_to_cpu(req->r_reply_info.head->result);
1252
1253
if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
1254
pr_warn_client(cl,
1255
"dentry %p:%pd async unlink bit is not set\n",
1256
dentry, dentry);
1257
1258
spin_lock(&fsc->async_unlink_conflict_lock);
1259
hash_del_rcu(&di->hnode);
1260
spin_unlock(&fsc->async_unlink_conflict_lock);
1261
1262
spin_lock(&dentry->d_lock);
1263
di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK;
1264
wake_up_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT);
1265
spin_unlock(&dentry->d_lock);
1266
1267
synchronize_rcu();
1268
1269
if (result == -EJUKEBOX)
1270
goto out;
1271
1272
/* If op failed, mark everyone involved for errors */
1273
if (result) {
1274
int pathlen = 0;
1275
u64 base = 0;
1276
char *path = ceph_mdsc_build_path(mdsc, dentry, &pathlen,
1277
&base, 0);
1278
1279
/* mark error on parent + clear complete */
1280
mapping_set_error(req->r_parent->i_mapping, result);
1281
ceph_dir_clear_complete(req->r_parent);
1282
1283
/* drop the dentry -- we don't know its status */
1284
if (!d_unhashed(dentry))
1285
d_drop(dentry);
1286
1287
/* mark inode itself for an error (since metadata is bogus) */
1288
mapping_set_error(req->r_old_inode->i_mapping, result);
1289
1290
pr_warn_client(cl, "failure path=(%llx)%s result=%d!\n",
1291
base, IS_ERR(path) ? "<<bad>>" : path, result);
1292
ceph_mdsc_free_path(path, pathlen);
1293
}
1294
out:
1295
iput(req->r_old_inode);
1296
ceph_mdsc_release_dir_caps(req);
1297
}
1298
1299
static int get_caps_for_async_unlink(struct inode *dir, struct dentry *dentry)
1300
{
1301
struct ceph_inode_info *ci = ceph_inode(dir);
1302
struct ceph_dentry_info *di;
1303
int got = 0, want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_UNLINK;
1304
1305
spin_lock(&ci->i_ceph_lock);
1306
if ((__ceph_caps_issued(ci, NULL) & want) == want) {
1307
ceph_take_cap_refs(ci, want, false);
1308
got = want;
1309
}
1310
spin_unlock(&ci->i_ceph_lock);
1311
1312
/* If we didn't get anything, return 0 */
1313
if (!got)
1314
return 0;
1315
1316
spin_lock(&dentry->d_lock);
1317
di = ceph_dentry(dentry);
1318
/*
1319
* - We are holding Fx, which implies Fs caps.
1320
* - Only support async unlink for primary linkage
1321
*/
1322
if (atomic_read(&ci->i_shared_gen) != di->lease_shared_gen ||
1323
!(di->flags & CEPH_DENTRY_PRIMARY_LINK))
1324
want = 0;
1325
spin_unlock(&dentry->d_lock);
1326
1327
/* Do we still want what we've got? */
1328
if (want == got)
1329
return got;
1330
1331
ceph_put_cap_refs(ci, got);
1332
return 0;
1333
}
1334
1335
/*
1336
* rmdir and unlink are differ only by the metadata op code
1337
*/
1338
static int ceph_unlink(struct inode *dir, struct dentry *dentry)
1339
{
1340
struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dir->i_sb);
1341
struct ceph_client *cl = fsc->client;
1342
struct ceph_mds_client *mdsc = fsc->mdsc;
1343
struct inode *inode = d_inode(dentry);
1344
struct ceph_mds_request *req;
1345
bool try_async = ceph_test_mount_opt(fsc, ASYNC_DIROPS);
1346
struct dentry *dn;
1347
int err = -EROFS;
1348
int op;
1349
char *path;
1350
int pathlen;
1351
u64 pathbase;
1352
1353
if (ceph_snap(dir) == CEPH_SNAPDIR) {
1354
/* rmdir .snap/foo is RMSNAP */
1355
doutc(cl, "rmsnap %llx.%llx/'%pd' dn\n", ceph_vinop(dir),
1356
dentry);
1357
op = CEPH_MDS_OP_RMSNAP;
1358
} else if (ceph_snap(dir) == CEPH_NOSNAP) {
1359
doutc(cl, "unlink/rmdir %llx.%llx/'%pd' inode %llx.%llx\n",
1360
ceph_vinop(dir), dentry, ceph_vinop(inode));
1361
op = d_is_dir(dentry) ?
1362
CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
1363
} else
1364
goto out;
1365
1366
dn = d_find_alias(dir);
1367
if (!dn) {
1368
try_async = false;
1369
} else {
1370
path = ceph_mdsc_build_path(mdsc, dn, &pathlen, &pathbase, 0);
1371
if (IS_ERR(path)) {
1372
try_async = false;
1373
err = 0;
1374
} else {
1375
err = ceph_mds_check_access(mdsc, path, MAY_WRITE);
1376
}
1377
ceph_mdsc_free_path(path, pathlen);
1378
dput(dn);
1379
1380
/* For none EACCES cases will let the MDS do the mds auth check */
1381
if (err == -EACCES) {
1382
return err;
1383
} else if (err < 0) {
1384
try_async = false;
1385
err = 0;
1386
}
1387
}
1388
1389
retry:
1390
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
1391
if (IS_ERR(req)) {
1392
err = PTR_ERR(req);
1393
goto out;
1394
}
1395
req->r_dentry = dget(dentry);
1396
req->r_num_caps = 2;
1397
req->r_parent = dir;
1398
ihold(dir);
1399
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_XATTR_EXCL;
1400
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
1401
req->r_inode_drop = ceph_drop_caps_for_unlink(inode);
1402
1403
if (try_async && op == CEPH_MDS_OP_UNLINK &&
1404
(req->r_dir_caps = get_caps_for_async_unlink(dir, dentry))) {
1405
struct ceph_dentry_info *di = ceph_dentry(dentry);
1406
1407
doutc(cl, "async unlink on %llx.%llx/'%pd' caps=%s",
1408
ceph_vinop(dir), dentry,
1409
ceph_cap_string(req->r_dir_caps));
1410
set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags);
1411
req->r_callback = ceph_async_unlink_cb;
1412
req->r_old_inode = d_inode(dentry);
1413
ihold(req->r_old_inode);
1414
1415
spin_lock(&dentry->d_lock);
1416
di->flags |= CEPH_DENTRY_ASYNC_UNLINK;
1417
spin_unlock(&dentry->d_lock);
1418
1419
spin_lock(&fsc->async_unlink_conflict_lock);
1420
hash_add_rcu(fsc->async_unlink_conflict, &di->hnode,
1421
dentry->d_name.hash);
1422
spin_unlock(&fsc->async_unlink_conflict_lock);
1423
1424
err = ceph_mdsc_submit_request(mdsc, dir, req);
1425
if (!err) {
1426
/*
1427
* We have enough caps, so we assume that the unlink
1428
* will succeed. Fix up the target inode and dcache.
1429
*/
1430
drop_nlink(inode);
1431
d_delete(dentry);
1432
} else {
1433
spin_lock(&fsc->async_unlink_conflict_lock);
1434
hash_del_rcu(&di->hnode);
1435
spin_unlock(&fsc->async_unlink_conflict_lock);
1436
1437
spin_lock(&dentry->d_lock);
1438
di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK;
1439
spin_unlock(&dentry->d_lock);
1440
1441
if (err == -EJUKEBOX) {
1442
try_async = false;
1443
ceph_mdsc_put_request(req);
1444
goto retry;
1445
}
1446
}
1447
} else {
1448
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
1449
err = ceph_mdsc_do_request(mdsc, dir, req);
1450
if (!err && !req->r_reply_info.head->is_dentry)
1451
d_delete(dentry);
1452
}
1453
1454
ceph_mdsc_put_request(req);
1455
out:
1456
return err;
1457
}
1458
1459
static int ceph_rename(struct mnt_idmap *idmap, struct inode *old_dir,
1460
struct dentry *old_dentry, struct inode *new_dir,
1461
struct dentry *new_dentry, unsigned int flags)
1462
{
1463
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(old_dir->i_sb);
1464
struct ceph_client *cl = mdsc->fsc->client;
1465
struct ceph_mds_request *req;
1466
int op = CEPH_MDS_OP_RENAME;
1467
int err;
1468
1469
if (flags)
1470
return -EINVAL;
1471
1472
if (ceph_snap(old_dir) != ceph_snap(new_dir))
1473
return -EXDEV;
1474
if (ceph_snap(old_dir) != CEPH_NOSNAP) {
1475
if (old_dir == new_dir && ceph_snap(old_dir) == CEPH_SNAPDIR)
1476
op = CEPH_MDS_OP_RENAMESNAP;
1477
else
1478
return -EROFS;
1479
}
1480
/* don't allow cross-quota renames */
1481
if ((old_dir != new_dir) &&
1482
(!ceph_quota_is_same_realm(old_dir, new_dir)))
1483
return -EXDEV;
1484
1485
err = ceph_wait_on_conflict_unlink(new_dentry);
1486
if (err)
1487
return err;
1488
1489
err = fscrypt_prepare_rename(old_dir, old_dentry, new_dir, new_dentry,
1490
flags);
1491
if (err)
1492
return err;
1493
1494
doutc(cl, "%llx.%llx/'%pd' to %llx.%llx/'%pd'\n",
1495
ceph_vinop(old_dir), old_dentry, ceph_vinop(new_dir),
1496
new_dentry);
1497
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
1498
if (IS_ERR(req))
1499
return PTR_ERR(req);
1500
ihold(old_dir);
1501
req->r_dentry = dget(new_dentry);
1502
req->r_num_caps = 2;
1503
req->r_old_dentry = dget(old_dentry);
1504
req->r_old_dentry_dir = old_dir;
1505
req->r_parent = new_dir;
1506
ihold(new_dir);
1507
set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
1508
req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_XATTR_EXCL;
1509
req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
1510
req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_XATTR_EXCL;
1511
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
1512
/* release LINK_RDCACHE on source inode (mds will lock it) */
1513
req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
1514
if (d_really_is_positive(new_dentry)) {
1515
req->r_inode_drop =
1516
ceph_drop_caps_for_unlink(d_inode(new_dentry));
1517
}
1518
err = ceph_mdsc_do_request(mdsc, old_dir, req);
1519
if (!err && !req->r_reply_info.head->is_dentry) {
1520
/*
1521
* Normally d_move() is done by fill_trace (called by
1522
* do_request, above). If there is no trace, we need
1523
* to do it here.
1524
*/
1525
d_move(old_dentry, new_dentry);
1526
}
1527
ceph_mdsc_put_request(req);
1528
return err;
1529
}
1530
1531
/*
1532
* Move dentry to tail of mdsc->dentry_leases list when lease is updated.
1533
* Leases at front of the list will expire first. (Assume all leases have
1534
* similar duration)
1535
*
1536
* Called under dentry->d_lock.
1537
*/
1538
void __ceph_dentry_lease_touch(struct ceph_dentry_info *di)
1539
{
1540
struct dentry *dn = di->dentry;
1541
struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(dn->d_sb)->mdsc;
1542
struct ceph_client *cl = mdsc->fsc->client;
1543
1544
doutc(cl, "%p %p '%pd'\n", di, dn, dn);
1545
1546
di->flags |= CEPH_DENTRY_LEASE_LIST;
1547
if (di->flags & CEPH_DENTRY_SHRINK_LIST) {
1548
di->flags |= CEPH_DENTRY_REFERENCED;
1549
return;
1550
}
1551
1552
spin_lock(&mdsc->dentry_list_lock);
1553
list_move_tail(&di->lease_list, &mdsc->dentry_leases);
1554
spin_unlock(&mdsc->dentry_list_lock);
1555
}
1556
1557
static void __dentry_dir_lease_touch(struct ceph_mds_client* mdsc,
1558
struct ceph_dentry_info *di)
1559
{
1560
di->flags &= ~(CEPH_DENTRY_LEASE_LIST | CEPH_DENTRY_REFERENCED);
1561
di->lease_gen = 0;
1562
di->time = jiffies;
1563
list_move_tail(&di->lease_list, &mdsc->dentry_dir_leases);
1564
}
1565
1566
/*
1567
* When dir lease is used, add dentry to tail of mdsc->dentry_dir_leases
1568
* list if it's not in the list, otherwise set 'referenced' flag.
1569
*
1570
* Called under dentry->d_lock.
1571
*/
1572
void __ceph_dentry_dir_lease_touch(struct ceph_dentry_info *di)
1573
{
1574
struct dentry *dn = di->dentry;
1575
struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(dn->d_sb)->mdsc;
1576
struct ceph_client *cl = mdsc->fsc->client;
1577
1578
doutc(cl, "%p %p '%pd' (offset 0x%llx)\n", di, dn, dn, di->offset);
1579
1580
if (!list_empty(&di->lease_list)) {
1581
if (di->flags & CEPH_DENTRY_LEASE_LIST) {
1582
/* don't remove dentry from dentry lease list
1583
* if its lease is valid */
1584
if (__dentry_lease_is_valid(di))
1585
return;
1586
} else {
1587
di->flags |= CEPH_DENTRY_REFERENCED;
1588
return;
1589
}
1590
}
1591
1592
if (di->flags & CEPH_DENTRY_SHRINK_LIST) {
1593
di->flags |= CEPH_DENTRY_REFERENCED;
1594
di->flags &= ~CEPH_DENTRY_LEASE_LIST;
1595
return;
1596
}
1597
1598
spin_lock(&mdsc->dentry_list_lock);
1599
__dentry_dir_lease_touch(mdsc, di);
1600
spin_unlock(&mdsc->dentry_list_lock);
1601
}
1602
1603
static void __dentry_lease_unlist(struct ceph_dentry_info *di)
1604
{
1605
struct ceph_mds_client *mdsc;
1606
if (di->flags & CEPH_DENTRY_SHRINK_LIST)
1607
return;
1608
if (list_empty(&di->lease_list))
1609
return;
1610
1611
mdsc = ceph_sb_to_fs_client(di->dentry->d_sb)->mdsc;
1612
spin_lock(&mdsc->dentry_list_lock);
1613
list_del_init(&di->lease_list);
1614
spin_unlock(&mdsc->dentry_list_lock);
1615
}
1616
1617
enum {
1618
KEEP = 0,
1619
DELETE = 1,
1620
TOUCH = 2,
1621
STOP = 4,
1622
};
1623
1624
struct ceph_lease_walk_control {
1625
bool dir_lease;
1626
bool expire_dir_lease;
1627
unsigned long nr_to_scan;
1628
unsigned long dir_lease_ttl;
1629
};
1630
1631
static int __dir_lease_check(const struct dentry *, struct ceph_lease_walk_control *);
1632
static int __dentry_lease_check(const struct dentry *);
1633
1634
static unsigned long
1635
__dentry_leases_walk(struct ceph_mds_client *mdsc,
1636
struct ceph_lease_walk_control *lwc)
1637
{
1638
struct ceph_dentry_info *di, *tmp;
1639
struct dentry *dentry, *last = NULL;
1640
struct list_head* list;
1641
LIST_HEAD(dispose);
1642
unsigned long freed = 0;
1643
int ret = 0;
1644
1645
list = lwc->dir_lease ? &mdsc->dentry_dir_leases : &mdsc->dentry_leases;
1646
spin_lock(&mdsc->dentry_list_lock);
1647
list_for_each_entry_safe(di, tmp, list, lease_list) {
1648
if (!lwc->nr_to_scan)
1649
break;
1650
--lwc->nr_to_scan;
1651
1652
dentry = di->dentry;
1653
if (last == dentry)
1654
break;
1655
1656
if (!spin_trylock(&dentry->d_lock))
1657
continue;
1658
1659
if (__lockref_is_dead(&dentry->d_lockref)) {
1660
list_del_init(&di->lease_list);
1661
goto next;
1662
}
1663
1664
if (lwc->dir_lease)
1665
ret = __dir_lease_check(dentry, lwc);
1666
else
1667
ret = __dentry_lease_check(dentry);
1668
if (ret & TOUCH) {
1669
/* move it into tail of dir lease list */
1670
__dentry_dir_lease_touch(mdsc, di);
1671
if (!last)
1672
last = dentry;
1673
}
1674
if (ret & DELETE) {
1675
/* stale lease */
1676
di->flags &= ~CEPH_DENTRY_REFERENCED;
1677
if (dentry->d_lockref.count > 0) {
1678
/* update_dentry_lease() will re-add
1679
* it to lease list, or
1680
* ceph_d_delete() will return 1 when
1681
* last reference is dropped */
1682
list_del_init(&di->lease_list);
1683
} else {
1684
di->flags |= CEPH_DENTRY_SHRINK_LIST;
1685
list_move_tail(&di->lease_list, &dispose);
1686
dget_dlock(dentry);
1687
}
1688
}
1689
next:
1690
spin_unlock(&dentry->d_lock);
1691
if (ret & STOP)
1692
break;
1693
}
1694
spin_unlock(&mdsc->dentry_list_lock);
1695
1696
while (!list_empty(&dispose)) {
1697
di = list_first_entry(&dispose, struct ceph_dentry_info,
1698
lease_list);
1699
dentry = di->dentry;
1700
spin_lock(&dentry->d_lock);
1701
1702
list_del_init(&di->lease_list);
1703
di->flags &= ~CEPH_DENTRY_SHRINK_LIST;
1704
if (di->flags & CEPH_DENTRY_REFERENCED) {
1705
spin_lock(&mdsc->dentry_list_lock);
1706
if (di->flags & CEPH_DENTRY_LEASE_LIST) {
1707
list_add_tail(&di->lease_list,
1708
&mdsc->dentry_leases);
1709
} else {
1710
__dentry_dir_lease_touch(mdsc, di);
1711
}
1712
spin_unlock(&mdsc->dentry_list_lock);
1713
} else {
1714
freed++;
1715
}
1716
1717
spin_unlock(&dentry->d_lock);
1718
/* ceph_d_delete() does the trick */
1719
dput(dentry);
1720
}
1721
return freed;
1722
}
1723
1724
static int __dentry_lease_check(const struct dentry *dentry)
1725
{
1726
struct ceph_dentry_info *di = ceph_dentry(dentry);
1727
int ret;
1728
1729
if (__dentry_lease_is_valid(di))
1730
return STOP;
1731
ret = __dir_lease_try_check(dentry);
1732
if (ret == -EBUSY)
1733
return KEEP;
1734
if (ret > 0)
1735
return TOUCH;
1736
return DELETE;
1737
}
1738
1739
static int __dir_lease_check(const struct dentry *dentry,
1740
struct ceph_lease_walk_control *lwc)
1741
{
1742
struct ceph_dentry_info *di = ceph_dentry(dentry);
1743
1744
int ret = __dir_lease_try_check(dentry);
1745
if (ret == -EBUSY)
1746
return KEEP;
1747
if (ret > 0) {
1748
if (time_before(jiffies, di->time + lwc->dir_lease_ttl))
1749
return STOP;
1750
/* Move dentry to tail of dir lease list if we don't want
1751
* to delete it. So dentries in the list are checked in a
1752
* round robin manner */
1753
if (!lwc->expire_dir_lease)
1754
return TOUCH;
1755
if (dentry->d_lockref.count > 0 ||
1756
(di->flags & CEPH_DENTRY_REFERENCED))
1757
return TOUCH;
1758
/* invalidate dir lease */
1759
di->lease_shared_gen = 0;
1760
}
1761
return DELETE;
1762
}
1763
1764
int ceph_trim_dentries(struct ceph_mds_client *mdsc)
1765
{
1766
struct ceph_lease_walk_control lwc;
1767
unsigned long count;
1768
unsigned long freed;
1769
1770
spin_lock(&mdsc->caps_list_lock);
1771
if (mdsc->caps_use_max > 0 &&
1772
mdsc->caps_use_count > mdsc->caps_use_max)
1773
count = mdsc->caps_use_count - mdsc->caps_use_max;
1774
else
1775
count = 0;
1776
spin_unlock(&mdsc->caps_list_lock);
1777
1778
lwc.dir_lease = false;
1779
lwc.nr_to_scan = CEPH_CAPS_PER_RELEASE * 2;
1780
freed = __dentry_leases_walk(mdsc, &lwc);
1781
if (!lwc.nr_to_scan) /* more invalid leases */
1782
return -EAGAIN;
1783
1784
if (lwc.nr_to_scan < CEPH_CAPS_PER_RELEASE)
1785
lwc.nr_to_scan = CEPH_CAPS_PER_RELEASE;
1786
1787
lwc.dir_lease = true;
1788
lwc.expire_dir_lease = freed < count;
1789
lwc.dir_lease_ttl = mdsc->fsc->mount_options->caps_wanted_delay_max * HZ;
1790
freed +=__dentry_leases_walk(mdsc, &lwc);
1791
if (!lwc.nr_to_scan) /* more to check */
1792
return -EAGAIN;
1793
1794
return freed > 0 ? 1 : 0;
1795
}
1796
1797
/*
1798
* Ensure a dentry lease will no longer revalidate.
1799
*/
1800
void ceph_invalidate_dentry_lease(struct dentry *dentry)
1801
{
1802
struct ceph_dentry_info *di = ceph_dentry(dentry);
1803
spin_lock(&dentry->d_lock);
1804
di->time = jiffies;
1805
di->lease_shared_gen = 0;
1806
di->flags &= ~CEPH_DENTRY_PRIMARY_LINK;
1807
__dentry_lease_unlist(di);
1808
spin_unlock(&dentry->d_lock);
1809
}
1810
1811
/*
1812
* Check if dentry lease is valid. If not, delete the lease. Try to
1813
* renew if the least is more than half up.
1814
*/
1815
static bool __dentry_lease_is_valid(struct ceph_dentry_info *di)
1816
{
1817
struct ceph_mds_session *session;
1818
1819
if (!di->lease_gen)
1820
return false;
1821
1822
session = di->lease_session;
1823
if (session) {
1824
u32 gen;
1825
unsigned long ttl;
1826
1827
gen = atomic_read(&session->s_cap_gen);
1828
ttl = session->s_cap_ttl;
1829
1830
if (di->lease_gen == gen &&
1831
time_before(jiffies, ttl) &&
1832
time_before(jiffies, di->time))
1833
return true;
1834
}
1835
di->lease_gen = 0;
1836
return false;
1837
}
1838
1839
static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags)
1840
{
1841
struct ceph_dentry_info *di;
1842
struct ceph_mds_session *session = NULL;
1843
struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(dentry->d_sb)->mdsc;
1844
struct ceph_client *cl = mdsc->fsc->client;
1845
u32 seq = 0;
1846
int valid = 0;
1847
1848
spin_lock(&dentry->d_lock);
1849
di = ceph_dentry(dentry);
1850
if (di && __dentry_lease_is_valid(di)) {
1851
valid = 1;
1852
1853
if (di->lease_renew_after &&
1854
time_after(jiffies, di->lease_renew_after)) {
1855
/*
1856
* We should renew. If we're in RCU walk mode
1857
* though, we can't do that so just return
1858
* -ECHILD.
1859
*/
1860
if (flags & LOOKUP_RCU) {
1861
valid = -ECHILD;
1862
} else {
1863
session = ceph_get_mds_session(di->lease_session);
1864
seq = di->lease_seq;
1865
di->lease_renew_after = 0;
1866
di->lease_renew_from = jiffies;
1867
}
1868
}
1869
}
1870
spin_unlock(&dentry->d_lock);
1871
1872
if (session) {
1873
ceph_mdsc_lease_send_msg(session, dentry,
1874
CEPH_MDS_LEASE_RENEW, seq);
1875
ceph_put_mds_session(session);
1876
}
1877
doutc(cl, "dentry %p = %d\n", dentry, valid);
1878
return valid;
1879
}
1880
1881
/*
1882
* Called under dentry->d_lock.
1883
*/
1884
static int __dir_lease_try_check(const struct dentry *dentry)
1885
{
1886
struct ceph_dentry_info *di = ceph_dentry(dentry);
1887
struct inode *dir;
1888
struct ceph_inode_info *ci;
1889
int valid = 0;
1890
1891
if (!di->lease_shared_gen)
1892
return 0;
1893
if (IS_ROOT(dentry))
1894
return 0;
1895
1896
dir = d_inode(dentry->d_parent);
1897
ci = ceph_inode(dir);
1898
1899
if (spin_trylock(&ci->i_ceph_lock)) {
1900
if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen &&
1901
__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 0))
1902
valid = 1;
1903
spin_unlock(&ci->i_ceph_lock);
1904
} else {
1905
valid = -EBUSY;
1906
}
1907
1908
if (!valid)
1909
di->lease_shared_gen = 0;
1910
return valid;
1911
}
1912
1913
/*
1914
* Check if directory-wide content lease/cap is valid.
1915
*/
1916
static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry,
1917
struct ceph_mds_client *mdsc)
1918
{
1919
struct ceph_inode_info *ci = ceph_inode(dir);
1920
struct ceph_client *cl = mdsc->fsc->client;
1921
int valid;
1922
int shared_gen;
1923
1924
spin_lock(&ci->i_ceph_lock);
1925
valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
1926
if (valid) {
1927
__ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD);
1928
shared_gen = atomic_read(&ci->i_shared_gen);
1929
}
1930
spin_unlock(&ci->i_ceph_lock);
1931
if (valid) {
1932
struct ceph_dentry_info *di;
1933
spin_lock(&dentry->d_lock);
1934
di = ceph_dentry(dentry);
1935
if (dir == d_inode(dentry->d_parent) &&
1936
di && di->lease_shared_gen == shared_gen)
1937
__ceph_dentry_dir_lease_touch(di);
1938
else
1939
valid = 0;
1940
spin_unlock(&dentry->d_lock);
1941
}
1942
doutc(cl, "dir %p %llx.%llx v%u dentry %p '%pd' = %d\n", dir,
1943
ceph_vinop(dir), (unsigned)atomic_read(&ci->i_shared_gen),
1944
dentry, dentry, valid);
1945
return valid;
1946
}
1947
1948
/*
1949
* Check if cached dentry can be trusted.
1950
*/
1951
static int ceph_d_revalidate(struct inode *dir, const struct qstr *name,
1952
struct dentry *dentry, unsigned int flags)
1953
{
1954
struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(dentry->d_sb)->mdsc;
1955
struct ceph_client *cl = mdsc->fsc->client;
1956
int valid = 0;
1957
struct inode *inode;
1958
1959
valid = fscrypt_d_revalidate(dir, name, dentry, flags);
1960
if (valid <= 0)
1961
return valid;
1962
1963
inode = d_inode_rcu(dentry);
1964
1965
doutc(cl, "%p '%pd' inode %p offset 0x%llx nokey %d\n",
1966
dentry, dentry, inode, ceph_dentry(dentry)->offset,
1967
!!(dentry->d_flags & DCACHE_NOKEY_NAME));
1968
1969
mdsc = ceph_sb_to_fs_client(dir->i_sb)->mdsc;
1970
1971
/* always trust cached snapped dentries, snapdir dentry */
1972
if (ceph_snap(dir) != CEPH_NOSNAP) {
1973
doutc(cl, "%p '%pd' inode %p is SNAPPED\n", dentry,
1974
dentry, inode);
1975
valid = 1;
1976
} else if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1977
valid = 1;
1978
} else {
1979
valid = dentry_lease_is_valid(dentry, flags);
1980
if (valid == -ECHILD)
1981
return valid;
1982
if (valid || dir_lease_is_valid(dir, dentry, mdsc)) {
1983
if (inode)
1984
valid = ceph_is_any_caps(inode);
1985
else
1986
valid = 1;
1987
}
1988
}
1989
1990
if (!valid) {
1991
struct ceph_mds_request *req;
1992
int op, err;
1993
u32 mask;
1994
1995
if (flags & LOOKUP_RCU)
1996
return -ECHILD;
1997
1998
percpu_counter_inc(&mdsc->metric.d_lease_mis);
1999
2000
op = ceph_snap(dir) == CEPH_SNAPDIR ?
2001
CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
2002
req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
2003
if (!IS_ERR(req)) {
2004
req->r_dentry = dget(dentry);
2005
req->r_num_caps = 2;
2006
req->r_parent = dir;
2007
ihold(dir);
2008
2009
req->r_dname = name;
2010
2011
mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
2012
if (ceph_security_xattr_wanted(dir))
2013
mask |= CEPH_CAP_XATTR_SHARED;
2014
req->r_args.getattr.mask = cpu_to_le32(mask);
2015
2016
err = ceph_mdsc_do_request(mdsc, NULL, req);
2017
switch (err) {
2018
case 0:
2019
if (d_really_is_positive(dentry) &&
2020
d_inode(dentry) == req->r_target_inode)
2021
valid = 1;
2022
break;
2023
case -ENOENT:
2024
if (d_really_is_negative(dentry))
2025
valid = 1;
2026
fallthrough;
2027
default:
2028
break;
2029
}
2030
ceph_mdsc_put_request(req);
2031
doutc(cl, "%p '%pd', lookup result=%d\n", dentry,
2032
dentry, err);
2033
}
2034
} else {
2035
percpu_counter_inc(&mdsc->metric.d_lease_hit);
2036
}
2037
2038
doutc(cl, "%p '%pd' %s\n", dentry, dentry, valid ? "valid" : "invalid");
2039
if (!valid)
2040
ceph_dir_clear_complete(dir);
2041
return valid;
2042
}
2043
2044
/*
2045
* Delete unused dentry that doesn't have valid lease
2046
*
2047
* Called under dentry->d_lock.
2048
*/
2049
static int ceph_d_delete(const struct dentry *dentry)
2050
{
2051
struct ceph_dentry_info *di;
2052
2053
/* won't release caps */
2054
if (d_really_is_negative(dentry))
2055
return 0;
2056
if (ceph_snap(d_inode(dentry)) != CEPH_NOSNAP)
2057
return 0;
2058
/* valid lease? */
2059
di = ceph_dentry(dentry);
2060
if (di) {
2061
if (__dentry_lease_is_valid(di))
2062
return 0;
2063
if (__dir_lease_try_check(dentry))
2064
return 0;
2065
}
2066
return 1;
2067
}
2068
2069
/*
2070
* Release our ceph_dentry_info.
2071
*/
2072
static void ceph_d_release(struct dentry *dentry)
2073
{
2074
struct ceph_dentry_info *di = ceph_dentry(dentry);
2075
struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb);
2076
2077
doutc(fsc->client, "dentry %p '%pd'\n", dentry, dentry);
2078
2079
atomic64_dec(&fsc->mdsc->metric.total_dentries);
2080
2081
spin_lock(&dentry->d_lock);
2082
__dentry_lease_unlist(di);
2083
dentry->d_fsdata = NULL;
2084
spin_unlock(&dentry->d_lock);
2085
2086
ceph_put_mds_session(di->lease_session);
2087
kmem_cache_free(ceph_dentry_cachep, di);
2088
}
2089
2090
/*
2091
* When the VFS prunes a dentry from the cache, we need to clear the
2092
* complete flag on the parent directory.
2093
*
2094
* Called under dentry->d_lock.
2095
*/
2096
static void ceph_d_prune(struct dentry *dentry)
2097
{
2098
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dentry->d_sb);
2099
struct ceph_client *cl = mdsc->fsc->client;
2100
struct ceph_inode_info *dir_ci;
2101
struct ceph_dentry_info *di;
2102
2103
doutc(cl, "dentry %p '%pd'\n", dentry, dentry);
2104
2105
/* do we have a valid parent? */
2106
if (IS_ROOT(dentry))
2107
return;
2108
2109
/* we hold d_lock, so d_parent is stable */
2110
dir_ci = ceph_inode(d_inode(dentry->d_parent));
2111
if (dir_ci->i_vino.snap == CEPH_SNAPDIR)
2112
return;
2113
2114
/* who calls d_delete() should also disable dcache readdir */
2115
if (d_really_is_negative(dentry))
2116
return;
2117
2118
/* d_fsdata does not get cleared until d_release */
2119
if (!d_unhashed(dentry)) {
2120
__ceph_dir_clear_complete(dir_ci);
2121
return;
2122
}
2123
2124
/* Disable dcache readdir just in case that someone called d_drop()
2125
* or d_invalidate(), but MDS didn't revoke CEPH_CAP_FILE_SHARED
2126
* properly (dcache readdir is still enabled) */
2127
di = ceph_dentry(dentry);
2128
if (di->offset > 0 &&
2129
di->lease_shared_gen == atomic_read(&dir_ci->i_shared_gen))
2130
__ceph_dir_clear_ordered(dir_ci);
2131
}
2132
2133
/*
2134
* read() on a dir. This weird interface hack only works if mounted
2135
* with '-o dirstat'.
2136
*/
2137
static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
2138
loff_t *ppos)
2139
{
2140
struct ceph_dir_file_info *dfi = file->private_data;
2141
struct inode *inode = file_inode(file);
2142
struct ceph_inode_info *ci = ceph_inode(inode);
2143
int left;
2144
const int bufsize = 1024;
2145
2146
if (!ceph_test_mount_opt(ceph_sb_to_fs_client(inode->i_sb), DIRSTAT))
2147
return -EISDIR;
2148
2149
if (!dfi->dir_info) {
2150
dfi->dir_info = kmalloc(bufsize, GFP_KERNEL);
2151
if (!dfi->dir_info)
2152
return -ENOMEM;
2153
dfi->dir_info_len =
2154
snprintf(dfi->dir_info, bufsize,
2155
"entries: %20lld\n"
2156
" files: %20lld\n"
2157
" subdirs: %20lld\n"
2158
"rentries: %20lld\n"
2159
" rfiles: %20lld\n"
2160
" rsubdirs: %20lld\n"
2161
"rbytes: %20lld\n"
2162
"rctime: %10lld.%09ld\n",
2163
ci->i_files + ci->i_subdirs,
2164
ci->i_files,
2165
ci->i_subdirs,
2166
ci->i_rfiles + ci->i_rsubdirs,
2167
ci->i_rfiles,
2168
ci->i_rsubdirs,
2169
ci->i_rbytes,
2170
ci->i_rctime.tv_sec,
2171
ci->i_rctime.tv_nsec);
2172
}
2173
2174
if (*ppos >= dfi->dir_info_len)
2175
return 0;
2176
size = min_t(unsigned, size, dfi->dir_info_len-*ppos);
2177
left = copy_to_user(buf, dfi->dir_info + *ppos, size);
2178
if (left == size)
2179
return -EFAULT;
2180
*ppos += (size - left);
2181
return size - left;
2182
}
2183
2184
2185
2186
/*
2187
* Return name hash for a given dentry. This is dependent on
2188
* the parent directory's hash function.
2189
*/
2190
unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn)
2191
{
2192
struct ceph_inode_info *dci = ceph_inode(dir);
2193
unsigned hash;
2194
2195
switch (dci->i_dir_layout.dl_dir_hash) {
2196
case 0: /* for backward compat */
2197
case CEPH_STR_HASH_LINUX:
2198
return dn->d_name.hash;
2199
2200
default:
2201
spin_lock(&dn->d_lock);
2202
hash = ceph_str_hash(dci->i_dir_layout.dl_dir_hash,
2203
dn->d_name.name, dn->d_name.len);
2204
spin_unlock(&dn->d_lock);
2205
return hash;
2206
}
2207
}
2208
2209
WRAP_DIR_ITER(ceph_readdir) // FIXME!
2210
const struct file_operations ceph_dir_fops = {
2211
.read = ceph_read_dir,
2212
.iterate_shared = shared_ceph_readdir,
2213
.llseek = ceph_dir_llseek,
2214
.open = ceph_open,
2215
.release = ceph_release,
2216
.unlocked_ioctl = ceph_ioctl,
2217
.compat_ioctl = compat_ptr_ioctl,
2218
.fsync = ceph_fsync,
2219
.lock = ceph_lock,
2220
.flock = ceph_flock,
2221
};
2222
2223
const struct file_operations ceph_snapdir_fops = {
2224
.iterate_shared = shared_ceph_readdir,
2225
.llseek = ceph_dir_llseek,
2226
.open = ceph_open,
2227
.release = ceph_release,
2228
};
2229
2230
const struct inode_operations ceph_dir_iops = {
2231
.lookup = ceph_lookup,
2232
.permission = ceph_permission,
2233
.getattr = ceph_getattr,
2234
.setattr = ceph_setattr,
2235
.listxattr = ceph_listxattr,
2236
.get_inode_acl = ceph_get_acl,
2237
.set_acl = ceph_set_acl,
2238
.mknod = ceph_mknod,
2239
.symlink = ceph_symlink,
2240
.mkdir = ceph_mkdir,
2241
.link = ceph_link,
2242
.unlink = ceph_unlink,
2243
.rmdir = ceph_unlink,
2244
.rename = ceph_rename,
2245
.create = ceph_create,
2246
.atomic_open = ceph_atomic_open,
2247
};
2248
2249
const struct inode_operations ceph_snapdir_iops = {
2250
.lookup = ceph_lookup,
2251
.permission = ceph_permission,
2252
.getattr = ceph_getattr,
2253
.mkdir = ceph_mkdir,
2254
.rmdir = ceph_unlink,
2255
.rename = ceph_rename,
2256
};
2257
2258
const struct dentry_operations ceph_dentry_ops = {
2259
.d_revalidate = ceph_d_revalidate,
2260
.d_delete = ceph_d_delete,
2261
.d_release = ceph_d_release,
2262
.d_prune = ceph_d_prune,
2263
.d_init = ceph_d_init,
2264
};
2265
2266