Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/ceph/caps.c
26281 views
1
// SPDX-License-Identifier: GPL-2.0
2
#include <linux/ceph/ceph_debug.h>
3
4
#include <linux/fs.h>
5
#include <linux/kernel.h>
6
#include <linux/sched/signal.h>
7
#include <linux/slab.h>
8
#include <linux/vmalloc.h>
9
#include <linux/wait.h>
10
#include <linux/writeback.h>
11
#include <linux/iversion.h>
12
#include <linux/filelock.h>
13
#include <linux/jiffies.h>
14
15
#include "super.h"
16
#include "mds_client.h"
17
#include "cache.h"
18
#include "crypto.h"
19
#include <linux/ceph/decode.h>
20
#include <linux/ceph/messenger.h>
21
22
/*
23
* Capability management
24
*
25
* The Ceph metadata servers control client access to inode metadata
26
* and file data by issuing capabilities, granting clients permission
27
* to read and/or write both inode field and file data to OSDs
28
* (storage nodes). Each capability consists of a set of bits
29
* indicating which operations are allowed.
30
*
31
* If the client holds a *_SHARED cap, the client has a coherent value
32
* that can be safely read from the cached inode.
33
*
34
* In the case of a *_EXCL (exclusive) or FILE_WR capabilities, the
35
* client is allowed to change inode attributes (e.g., file size,
36
* mtime), note its dirty state in the ceph_cap, and asynchronously
37
* flush that metadata change to the MDS.
38
*
39
* In the event of a conflicting operation (perhaps by another
40
* client), the MDS will revoke the conflicting client capabilities.
41
*
42
* In order for a client to cache an inode, it must hold a capability
43
* with at least one MDS server. When inodes are released, release
44
* notifications are batched and periodically sent en masse to the MDS
45
* cluster to release server state.
46
*/
47
48
static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc);
49
static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
50
struct ceph_mds_session *session,
51
struct ceph_inode_info *ci,
52
u64 oldest_flush_tid);
53
54
/*
55
* Generate readable cap strings for debugging output.
56
*/
57
#define MAX_CAP_STR 20
58
static char cap_str[MAX_CAP_STR][40];
59
static DEFINE_SPINLOCK(cap_str_lock);
60
static int last_cap_str;
61
62
static char *gcap_string(char *s, int c)
63
{
64
if (c & CEPH_CAP_GSHARED)
65
*s++ = 's';
66
if (c & CEPH_CAP_GEXCL)
67
*s++ = 'x';
68
if (c & CEPH_CAP_GCACHE)
69
*s++ = 'c';
70
if (c & CEPH_CAP_GRD)
71
*s++ = 'r';
72
if (c & CEPH_CAP_GWR)
73
*s++ = 'w';
74
if (c & CEPH_CAP_GBUFFER)
75
*s++ = 'b';
76
if (c & CEPH_CAP_GWREXTEND)
77
*s++ = 'a';
78
if (c & CEPH_CAP_GLAZYIO)
79
*s++ = 'l';
80
return s;
81
}
82
83
const char *ceph_cap_string(int caps)
84
{
85
int i;
86
char *s;
87
int c;
88
89
spin_lock(&cap_str_lock);
90
i = last_cap_str++;
91
if (last_cap_str == MAX_CAP_STR)
92
last_cap_str = 0;
93
spin_unlock(&cap_str_lock);
94
95
s = cap_str[i];
96
97
if (caps & CEPH_CAP_PIN)
98
*s++ = 'p';
99
100
c = (caps >> CEPH_CAP_SAUTH) & 3;
101
if (c) {
102
*s++ = 'A';
103
s = gcap_string(s, c);
104
}
105
106
c = (caps >> CEPH_CAP_SLINK) & 3;
107
if (c) {
108
*s++ = 'L';
109
s = gcap_string(s, c);
110
}
111
112
c = (caps >> CEPH_CAP_SXATTR) & 3;
113
if (c) {
114
*s++ = 'X';
115
s = gcap_string(s, c);
116
}
117
118
c = caps >> CEPH_CAP_SFILE;
119
if (c) {
120
*s++ = 'F';
121
s = gcap_string(s, c);
122
}
123
124
if (s == cap_str[i])
125
*s++ = '-';
126
*s = 0;
127
return cap_str[i];
128
}
129
130
void ceph_caps_init(struct ceph_mds_client *mdsc)
131
{
132
INIT_LIST_HEAD(&mdsc->caps_list);
133
spin_lock_init(&mdsc->caps_list_lock);
134
}
135
136
void ceph_caps_finalize(struct ceph_mds_client *mdsc)
137
{
138
struct ceph_cap *cap;
139
140
spin_lock(&mdsc->caps_list_lock);
141
while (!list_empty(&mdsc->caps_list)) {
142
cap = list_first_entry(&mdsc->caps_list,
143
struct ceph_cap, caps_item);
144
list_del(&cap->caps_item);
145
kmem_cache_free(ceph_cap_cachep, cap);
146
}
147
mdsc->caps_total_count = 0;
148
mdsc->caps_avail_count = 0;
149
mdsc->caps_use_count = 0;
150
mdsc->caps_reserve_count = 0;
151
mdsc->caps_min_count = 0;
152
spin_unlock(&mdsc->caps_list_lock);
153
}
154
155
void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
156
struct ceph_mount_options *fsopt)
157
{
158
spin_lock(&mdsc->caps_list_lock);
159
mdsc->caps_min_count = fsopt->max_readdir;
160
if (mdsc->caps_min_count < 1024)
161
mdsc->caps_min_count = 1024;
162
mdsc->caps_use_max = fsopt->caps_max;
163
if (mdsc->caps_use_max > 0 &&
164
mdsc->caps_use_max < mdsc->caps_min_count)
165
mdsc->caps_use_max = mdsc->caps_min_count;
166
spin_unlock(&mdsc->caps_list_lock);
167
}
168
169
static void __ceph_unreserve_caps(struct ceph_mds_client *mdsc, int nr_caps)
170
{
171
struct ceph_cap *cap;
172
int i;
173
174
if (nr_caps) {
175
BUG_ON(mdsc->caps_reserve_count < nr_caps);
176
mdsc->caps_reserve_count -= nr_caps;
177
if (mdsc->caps_avail_count >=
178
mdsc->caps_reserve_count + mdsc->caps_min_count) {
179
mdsc->caps_total_count -= nr_caps;
180
for (i = 0; i < nr_caps; i++) {
181
cap = list_first_entry(&mdsc->caps_list,
182
struct ceph_cap, caps_item);
183
list_del(&cap->caps_item);
184
kmem_cache_free(ceph_cap_cachep, cap);
185
}
186
} else {
187
mdsc->caps_avail_count += nr_caps;
188
}
189
190
doutc(mdsc->fsc->client,
191
"caps %d = %d used + %d resv + %d avail\n",
192
mdsc->caps_total_count, mdsc->caps_use_count,
193
mdsc->caps_reserve_count, mdsc->caps_avail_count);
194
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
195
mdsc->caps_reserve_count +
196
mdsc->caps_avail_count);
197
}
198
}
199
200
/*
201
* Called under mdsc->mutex.
202
*/
203
int ceph_reserve_caps(struct ceph_mds_client *mdsc,
204
struct ceph_cap_reservation *ctx, int need)
205
{
206
struct ceph_client *cl = mdsc->fsc->client;
207
int i, j;
208
struct ceph_cap *cap;
209
int have;
210
int alloc = 0;
211
int max_caps;
212
int err = 0;
213
bool trimmed = false;
214
struct ceph_mds_session *s;
215
LIST_HEAD(newcaps);
216
217
doutc(cl, "ctx=%p need=%d\n", ctx, need);
218
219
/* first reserve any caps that are already allocated */
220
spin_lock(&mdsc->caps_list_lock);
221
if (mdsc->caps_avail_count >= need)
222
have = need;
223
else
224
have = mdsc->caps_avail_count;
225
mdsc->caps_avail_count -= have;
226
mdsc->caps_reserve_count += have;
227
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
228
mdsc->caps_reserve_count +
229
mdsc->caps_avail_count);
230
spin_unlock(&mdsc->caps_list_lock);
231
232
for (i = have; i < need; ) {
233
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
234
if (cap) {
235
list_add(&cap->caps_item, &newcaps);
236
alloc++;
237
i++;
238
continue;
239
}
240
241
if (!trimmed) {
242
for (j = 0; j < mdsc->max_sessions; j++) {
243
s = __ceph_lookup_mds_session(mdsc, j);
244
if (!s)
245
continue;
246
mutex_unlock(&mdsc->mutex);
247
248
mutex_lock(&s->s_mutex);
249
max_caps = s->s_nr_caps - (need - i);
250
ceph_trim_caps(mdsc, s, max_caps);
251
mutex_unlock(&s->s_mutex);
252
253
ceph_put_mds_session(s);
254
mutex_lock(&mdsc->mutex);
255
}
256
trimmed = true;
257
258
spin_lock(&mdsc->caps_list_lock);
259
if (mdsc->caps_avail_count) {
260
int more_have;
261
if (mdsc->caps_avail_count >= need - i)
262
more_have = need - i;
263
else
264
more_have = mdsc->caps_avail_count;
265
266
i += more_have;
267
have += more_have;
268
mdsc->caps_avail_count -= more_have;
269
mdsc->caps_reserve_count += more_have;
270
271
}
272
spin_unlock(&mdsc->caps_list_lock);
273
274
continue;
275
}
276
277
pr_warn_client(cl, "ctx=%p ENOMEM need=%d got=%d\n", ctx, need,
278
have + alloc);
279
err = -ENOMEM;
280
break;
281
}
282
283
if (!err) {
284
BUG_ON(have + alloc != need);
285
ctx->count = need;
286
ctx->used = 0;
287
}
288
289
spin_lock(&mdsc->caps_list_lock);
290
mdsc->caps_total_count += alloc;
291
mdsc->caps_reserve_count += alloc;
292
list_splice(&newcaps, &mdsc->caps_list);
293
294
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
295
mdsc->caps_reserve_count +
296
mdsc->caps_avail_count);
297
298
if (err)
299
__ceph_unreserve_caps(mdsc, have + alloc);
300
301
spin_unlock(&mdsc->caps_list_lock);
302
303
doutc(cl, "ctx=%p %d = %d used + %d resv + %d avail\n", ctx,
304
mdsc->caps_total_count, mdsc->caps_use_count,
305
mdsc->caps_reserve_count, mdsc->caps_avail_count);
306
return err;
307
}
308
309
void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
310
struct ceph_cap_reservation *ctx)
311
{
312
struct ceph_client *cl = mdsc->fsc->client;
313
bool reclaim = false;
314
if (!ctx->count)
315
return;
316
317
doutc(cl, "ctx=%p count=%d\n", ctx, ctx->count);
318
spin_lock(&mdsc->caps_list_lock);
319
__ceph_unreserve_caps(mdsc, ctx->count);
320
ctx->count = 0;
321
322
if (mdsc->caps_use_max > 0 &&
323
mdsc->caps_use_count > mdsc->caps_use_max)
324
reclaim = true;
325
spin_unlock(&mdsc->caps_list_lock);
326
327
if (reclaim)
328
ceph_reclaim_caps_nr(mdsc, ctx->used);
329
}
330
331
struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
332
struct ceph_cap_reservation *ctx)
333
{
334
struct ceph_client *cl = mdsc->fsc->client;
335
struct ceph_cap *cap = NULL;
336
337
/* temporary, until we do something about cap import/export */
338
if (!ctx) {
339
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
340
if (cap) {
341
spin_lock(&mdsc->caps_list_lock);
342
mdsc->caps_use_count++;
343
mdsc->caps_total_count++;
344
spin_unlock(&mdsc->caps_list_lock);
345
} else {
346
spin_lock(&mdsc->caps_list_lock);
347
if (mdsc->caps_avail_count) {
348
BUG_ON(list_empty(&mdsc->caps_list));
349
350
mdsc->caps_avail_count--;
351
mdsc->caps_use_count++;
352
cap = list_first_entry(&mdsc->caps_list,
353
struct ceph_cap, caps_item);
354
list_del(&cap->caps_item);
355
356
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
357
mdsc->caps_reserve_count + mdsc->caps_avail_count);
358
}
359
spin_unlock(&mdsc->caps_list_lock);
360
}
361
362
return cap;
363
}
364
365
spin_lock(&mdsc->caps_list_lock);
366
doutc(cl, "ctx=%p (%d) %d = %d used + %d resv + %d avail\n", ctx,
367
ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
368
mdsc->caps_reserve_count, mdsc->caps_avail_count);
369
BUG_ON(!ctx->count);
370
BUG_ON(ctx->count > mdsc->caps_reserve_count);
371
BUG_ON(list_empty(&mdsc->caps_list));
372
373
ctx->count--;
374
ctx->used++;
375
mdsc->caps_reserve_count--;
376
mdsc->caps_use_count++;
377
378
cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
379
list_del(&cap->caps_item);
380
381
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
382
mdsc->caps_reserve_count + mdsc->caps_avail_count);
383
spin_unlock(&mdsc->caps_list_lock);
384
return cap;
385
}
386
387
void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
388
{
389
struct ceph_client *cl = mdsc->fsc->client;
390
391
spin_lock(&mdsc->caps_list_lock);
392
doutc(cl, "%p %d = %d used + %d resv + %d avail\n", cap,
393
mdsc->caps_total_count, mdsc->caps_use_count,
394
mdsc->caps_reserve_count, mdsc->caps_avail_count);
395
mdsc->caps_use_count--;
396
/*
397
* Keep some preallocated caps around (ceph_min_count), to
398
* avoid lots of free/alloc churn.
399
*/
400
if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
401
mdsc->caps_min_count) {
402
mdsc->caps_total_count--;
403
kmem_cache_free(ceph_cap_cachep, cap);
404
} else {
405
mdsc->caps_avail_count++;
406
list_add(&cap->caps_item, &mdsc->caps_list);
407
}
408
409
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
410
mdsc->caps_reserve_count + mdsc->caps_avail_count);
411
spin_unlock(&mdsc->caps_list_lock);
412
}
413
414
void ceph_reservation_status(struct ceph_fs_client *fsc,
415
int *total, int *avail, int *used, int *reserved,
416
int *min)
417
{
418
struct ceph_mds_client *mdsc = fsc->mdsc;
419
420
spin_lock(&mdsc->caps_list_lock);
421
422
if (total)
423
*total = mdsc->caps_total_count;
424
if (avail)
425
*avail = mdsc->caps_avail_count;
426
if (used)
427
*used = mdsc->caps_use_count;
428
if (reserved)
429
*reserved = mdsc->caps_reserve_count;
430
if (min)
431
*min = mdsc->caps_min_count;
432
433
spin_unlock(&mdsc->caps_list_lock);
434
}
435
436
/*
437
* Find ceph_cap for given mds, if any.
438
*
439
* Called with i_ceph_lock held.
440
*/
441
struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
442
{
443
struct ceph_cap *cap;
444
struct rb_node *n = ci->i_caps.rb_node;
445
446
while (n) {
447
cap = rb_entry(n, struct ceph_cap, ci_node);
448
if (mds < cap->mds)
449
n = n->rb_left;
450
else if (mds > cap->mds)
451
n = n->rb_right;
452
else
453
return cap;
454
}
455
return NULL;
456
}
457
458
struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
459
{
460
struct ceph_cap *cap;
461
462
spin_lock(&ci->i_ceph_lock);
463
cap = __get_cap_for_mds(ci, mds);
464
spin_unlock(&ci->i_ceph_lock);
465
return cap;
466
}
467
468
/*
469
* Called under i_ceph_lock.
470
*/
471
static void __insert_cap_node(struct ceph_inode_info *ci,
472
struct ceph_cap *new)
473
{
474
struct rb_node **p = &ci->i_caps.rb_node;
475
struct rb_node *parent = NULL;
476
struct ceph_cap *cap = NULL;
477
478
while (*p) {
479
parent = *p;
480
cap = rb_entry(parent, struct ceph_cap, ci_node);
481
if (new->mds < cap->mds)
482
p = &(*p)->rb_left;
483
else if (new->mds > cap->mds)
484
p = &(*p)->rb_right;
485
else
486
BUG();
487
}
488
489
rb_link_node(&new->ci_node, parent, p);
490
rb_insert_color(&new->ci_node, &ci->i_caps);
491
}
492
493
/*
494
* (re)set cap hold timeouts, which control the delayed release
495
* of unused caps back to the MDS. Should be called on cap use.
496
*/
497
static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
498
struct ceph_inode_info *ci)
499
{
500
struct inode *inode = &ci->netfs.inode;
501
struct ceph_mount_options *opt = mdsc->fsc->mount_options;
502
503
ci->i_hold_caps_max = round_jiffies(jiffies +
504
opt->caps_wanted_delay_max * HZ);
505
doutc(mdsc->fsc->client, "%p %llx.%llx %lu\n", inode,
506
ceph_vinop(inode), ci->i_hold_caps_max - jiffies);
507
}
508
509
/*
510
* (Re)queue cap at the end of the delayed cap release list.
511
*
512
* If I_FLUSH is set, leave the inode at the front of the list.
513
*
514
* Caller holds i_ceph_lock
515
* -> we take mdsc->cap_delay_lock
516
*/
517
static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
518
struct ceph_inode_info *ci)
519
{
520
struct inode *inode = &ci->netfs.inode;
521
522
doutc(mdsc->fsc->client, "%p %llx.%llx flags 0x%lx at %lu\n",
523
inode, ceph_vinop(inode), ci->i_ceph_flags,
524
ci->i_hold_caps_max);
525
if (!mdsc->stopping) {
526
spin_lock(&mdsc->cap_delay_lock);
527
if (!list_empty(&ci->i_cap_delay_list)) {
528
if (ci->i_ceph_flags & CEPH_I_FLUSH)
529
goto no_change;
530
list_del_init(&ci->i_cap_delay_list);
531
}
532
__cap_set_timeouts(mdsc, ci);
533
list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
534
no_change:
535
spin_unlock(&mdsc->cap_delay_lock);
536
}
537
}
538
539
/*
540
* Queue an inode for immediate writeback. Mark inode with I_FLUSH,
541
* indicating we should send a cap message to flush dirty metadata
542
* asap, and move to the front of the delayed cap list.
543
*/
544
static void __cap_delay_requeue_front(struct ceph_mds_client *mdsc,
545
struct ceph_inode_info *ci)
546
{
547
struct inode *inode = &ci->netfs.inode;
548
549
doutc(mdsc->fsc->client, "%p %llx.%llx\n", inode, ceph_vinop(inode));
550
spin_lock(&mdsc->cap_delay_lock);
551
ci->i_ceph_flags |= CEPH_I_FLUSH;
552
if (!list_empty(&ci->i_cap_delay_list))
553
list_del_init(&ci->i_cap_delay_list);
554
list_add(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
555
spin_unlock(&mdsc->cap_delay_lock);
556
}
557
558
/*
559
* Cancel delayed work on cap.
560
*
561
* Caller must hold i_ceph_lock.
562
*/
563
static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
564
struct ceph_inode_info *ci)
565
{
566
struct inode *inode = &ci->netfs.inode;
567
568
doutc(mdsc->fsc->client, "%p %llx.%llx\n", inode, ceph_vinop(inode));
569
if (list_empty(&ci->i_cap_delay_list))
570
return;
571
spin_lock(&mdsc->cap_delay_lock);
572
list_del_init(&ci->i_cap_delay_list);
573
spin_unlock(&mdsc->cap_delay_lock);
574
}
575
576
/* Common issue checks for add_cap, handle_cap_grant. */
577
static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
578
unsigned issued)
579
{
580
struct inode *inode = &ci->netfs.inode;
581
struct ceph_client *cl = ceph_inode_to_client(inode);
582
583
unsigned had = __ceph_caps_issued(ci, NULL);
584
585
lockdep_assert_held(&ci->i_ceph_lock);
586
587
/*
588
* Each time we receive FILE_CACHE anew, we increment
589
* i_rdcache_gen.
590
*/
591
if (S_ISREG(ci->netfs.inode.i_mode) &&
592
(issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
593
(had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) {
594
ci->i_rdcache_gen++;
595
}
596
597
/*
598
* If FILE_SHARED is newly issued, mark dir not complete. We don't
599
* know what happened to this directory while we didn't have the cap.
600
* If FILE_SHARED is being revoked, also mark dir not complete. It
601
* stops on-going cached readdir.
602
*/
603
if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) {
604
if (issued & CEPH_CAP_FILE_SHARED)
605
atomic_inc(&ci->i_shared_gen);
606
if (S_ISDIR(ci->netfs.inode.i_mode)) {
607
doutc(cl, " marking %p NOT complete\n", inode);
608
__ceph_dir_clear_complete(ci);
609
}
610
}
611
612
/* Wipe saved layout if we're losing DIR_CREATE caps */
613
if (S_ISDIR(ci->netfs.inode.i_mode) && (had & CEPH_CAP_DIR_CREATE) &&
614
!(issued & CEPH_CAP_DIR_CREATE)) {
615
ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
616
memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
617
}
618
}
619
620
/**
621
* change_auth_cap_ses - move inode to appropriate lists when auth caps change
622
* @ci: inode to be moved
623
* @session: new auth caps session
624
*/
625
void change_auth_cap_ses(struct ceph_inode_info *ci,
626
struct ceph_mds_session *session)
627
{
628
lockdep_assert_held(&ci->i_ceph_lock);
629
630
if (list_empty(&ci->i_dirty_item) && list_empty(&ci->i_flushing_item))
631
return;
632
633
spin_lock(&session->s_mdsc->cap_dirty_lock);
634
if (!list_empty(&ci->i_dirty_item))
635
list_move(&ci->i_dirty_item, &session->s_cap_dirty);
636
if (!list_empty(&ci->i_flushing_item))
637
list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing);
638
spin_unlock(&session->s_mdsc->cap_dirty_lock);
639
}
640
641
/*
642
* Add a capability under the given MDS session.
643
*
644
* Caller should hold session snap_rwsem (read) and ci->i_ceph_lock
645
*
646
* @fmode is the open file mode, if we are opening a file, otherwise
647
* it is < 0. (This is so we can atomically add the cap and add an
648
* open file reference to it.)
649
*/
650
void ceph_add_cap(struct inode *inode,
651
struct ceph_mds_session *session, u64 cap_id,
652
unsigned issued, unsigned wanted,
653
unsigned seq, unsigned mseq, u64 realmino, int flags,
654
struct ceph_cap **new_cap)
655
{
656
struct ceph_mds_client *mdsc = ceph_inode_to_fs_client(inode)->mdsc;
657
struct ceph_client *cl = ceph_inode_to_client(inode);
658
struct ceph_inode_info *ci = ceph_inode(inode);
659
struct ceph_cap *cap;
660
int mds = session->s_mds;
661
int actual_wanted;
662
u32 gen;
663
664
lockdep_assert_held(&ci->i_ceph_lock);
665
666
doutc(cl, "%p %llx.%llx mds%d cap %llx %s seq %d\n", inode,
667
ceph_vinop(inode), session->s_mds, cap_id,
668
ceph_cap_string(issued), seq);
669
670
gen = atomic_read(&session->s_cap_gen);
671
672
cap = __get_cap_for_mds(ci, mds);
673
if (!cap) {
674
cap = *new_cap;
675
*new_cap = NULL;
676
677
cap->issued = 0;
678
cap->implemented = 0;
679
cap->mds = mds;
680
cap->mds_wanted = 0;
681
cap->mseq = 0;
682
683
cap->ci = ci;
684
__insert_cap_node(ci, cap);
685
686
/* add to session cap list */
687
cap->session = session;
688
spin_lock(&session->s_cap_lock);
689
list_add_tail(&cap->session_caps, &session->s_caps);
690
session->s_nr_caps++;
691
atomic64_inc(&mdsc->metric.total_caps);
692
spin_unlock(&session->s_cap_lock);
693
} else {
694
spin_lock(&session->s_cap_lock);
695
list_move_tail(&cap->session_caps, &session->s_caps);
696
spin_unlock(&session->s_cap_lock);
697
698
if (cap->cap_gen < gen)
699
cap->issued = cap->implemented = CEPH_CAP_PIN;
700
701
/*
702
* auth mds of the inode changed. we received the cap export
703
* message, but still haven't received the cap import message.
704
* handle_cap_export() updated the new auth MDS' cap.
705
*
706
* "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing
707
* a message that was send before the cap import message. So
708
* don't remove caps.
709
*/
710
if (ceph_seq_cmp(seq, cap->seq) <= 0) {
711
WARN_ON(cap != ci->i_auth_cap);
712
WARN_ON(cap->cap_id != cap_id);
713
seq = cap->seq;
714
mseq = cap->mseq;
715
issued |= cap->issued;
716
flags |= CEPH_CAP_FLAG_AUTH;
717
}
718
}
719
720
if (!ci->i_snap_realm ||
721
((flags & CEPH_CAP_FLAG_AUTH) &&
722
realmino != (u64)-1 && ci->i_snap_realm->ino != realmino)) {
723
/*
724
* add this inode to the appropriate snap realm
725
*/
726
struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
727
realmino);
728
if (realm)
729
ceph_change_snap_realm(inode, realm);
730
else
731
WARN(1, "%s: couldn't find snap realm 0x%llx (ino 0x%llx oldrealm 0x%llx)\n",
732
__func__, realmino, ci->i_vino.ino,
733
ci->i_snap_realm ? ci->i_snap_realm->ino : 0);
734
}
735
736
__check_cap_issue(ci, cap, issued);
737
738
/*
739
* If we are issued caps we don't want, or the mds' wanted
740
* value appears to be off, queue a check so we'll release
741
* later and/or update the mds wanted value.
742
*/
743
actual_wanted = __ceph_caps_wanted(ci);
744
if ((wanted & ~actual_wanted) ||
745
(issued & ~actual_wanted & CEPH_CAP_ANY_WR)) {
746
doutc(cl, "issued %s, mds wanted %s, actual %s, queueing\n",
747
ceph_cap_string(issued), ceph_cap_string(wanted),
748
ceph_cap_string(actual_wanted));
749
__cap_delay_requeue(mdsc, ci);
750
}
751
752
if (flags & CEPH_CAP_FLAG_AUTH) {
753
if (!ci->i_auth_cap ||
754
ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) {
755
if (ci->i_auth_cap &&
756
ci->i_auth_cap->session != cap->session)
757
change_auth_cap_ses(ci, cap->session);
758
ci->i_auth_cap = cap;
759
cap->mds_wanted = wanted;
760
}
761
} else {
762
WARN_ON(ci->i_auth_cap == cap);
763
}
764
765
doutc(cl, "inode %p %llx.%llx cap %p %s now %s seq %d mds%d\n",
766
inode, ceph_vinop(inode), cap, ceph_cap_string(issued),
767
ceph_cap_string(issued|cap->issued), seq, mds);
768
cap->cap_id = cap_id;
769
cap->issued = issued;
770
cap->implemented |= issued;
771
if (ceph_seq_cmp(mseq, cap->mseq) > 0)
772
cap->mds_wanted = wanted;
773
else
774
cap->mds_wanted |= wanted;
775
cap->seq = seq;
776
cap->issue_seq = seq;
777
cap->mseq = mseq;
778
cap->cap_gen = gen;
779
wake_up_all(&ci->i_cap_wq);
780
}
781
782
/*
783
* Return true if cap has not timed out and belongs to the current
784
* generation of the MDS session (i.e. has not gone 'stale' due to
785
* us losing touch with the mds).
786
*/
787
static int __cap_is_valid(struct ceph_cap *cap)
788
{
789
struct inode *inode = &cap->ci->netfs.inode;
790
struct ceph_client *cl = cap->session->s_mdsc->fsc->client;
791
unsigned long ttl;
792
u32 gen;
793
794
gen = atomic_read(&cap->session->s_cap_gen);
795
ttl = cap->session->s_cap_ttl;
796
797
if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) {
798
doutc(cl, "%p %llx.%llx cap %p issued %s but STALE (gen %u vs %u)\n",
799
inode, ceph_vinop(inode), cap,
800
ceph_cap_string(cap->issued), cap->cap_gen, gen);
801
return 0;
802
}
803
804
return 1;
805
}
806
807
/*
808
* Return set of valid cap bits issued to us. Note that caps time
809
* out, and may be invalidated in bulk if the client session times out
810
* and session->s_cap_gen is bumped.
811
*/
812
int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
813
{
814
struct inode *inode = &ci->netfs.inode;
815
struct ceph_client *cl = ceph_inode_to_client(inode);
816
int have = ci->i_snap_caps;
817
struct ceph_cap *cap;
818
struct rb_node *p;
819
820
if (implemented)
821
*implemented = 0;
822
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
823
cap = rb_entry(p, struct ceph_cap, ci_node);
824
if (!__cap_is_valid(cap))
825
continue;
826
doutc(cl, "%p %llx.%llx cap %p issued %s\n", inode,
827
ceph_vinop(inode), cap, ceph_cap_string(cap->issued));
828
have |= cap->issued;
829
if (implemented)
830
*implemented |= cap->implemented;
831
}
832
/*
833
* exclude caps issued by non-auth MDS, but are been revoking
834
* by the auth MDS. The non-auth MDS should be revoking/exporting
835
* these caps, but the message is delayed.
836
*/
837
if (ci->i_auth_cap) {
838
cap = ci->i_auth_cap;
839
have &= ~cap->implemented | cap->issued;
840
}
841
return have;
842
}
843
844
/*
845
* Get cap bits issued by caps other than @ocap
846
*/
847
int __ceph_caps_issued_other(struct ceph_inode_info *ci, struct ceph_cap *ocap)
848
{
849
int have = ci->i_snap_caps;
850
struct ceph_cap *cap;
851
struct rb_node *p;
852
853
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
854
cap = rb_entry(p, struct ceph_cap, ci_node);
855
if (cap == ocap)
856
continue;
857
if (!__cap_is_valid(cap))
858
continue;
859
have |= cap->issued;
860
}
861
return have;
862
}
863
864
/*
865
* Move a cap to the end of the LRU (oldest caps at list head, newest
866
* at list tail).
867
*/
868
static void __touch_cap(struct ceph_cap *cap)
869
{
870
struct inode *inode = &cap->ci->netfs.inode;
871
struct ceph_mds_session *s = cap->session;
872
struct ceph_client *cl = s->s_mdsc->fsc->client;
873
874
spin_lock(&s->s_cap_lock);
875
if (!s->s_cap_iterator) {
876
doutc(cl, "%p %llx.%llx cap %p mds%d\n", inode,
877
ceph_vinop(inode), cap, s->s_mds);
878
list_move_tail(&cap->session_caps, &s->s_caps);
879
} else {
880
doutc(cl, "%p %llx.%llx cap %p mds%d NOP, iterating over caps\n",
881
inode, ceph_vinop(inode), cap, s->s_mds);
882
}
883
spin_unlock(&s->s_cap_lock);
884
}
885
886
/*
887
* Check if we hold the given mask. If so, move the cap(s) to the
888
* front of their respective LRUs. (This is the preferred way for
889
* callers to check for caps they want.)
890
*/
891
int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
892
{
893
struct inode *inode = &ci->netfs.inode;
894
struct ceph_client *cl = ceph_inode_to_client(inode);
895
struct ceph_cap *cap;
896
struct rb_node *p;
897
int have = ci->i_snap_caps;
898
899
if ((have & mask) == mask) {
900
doutc(cl, "mask %p %llx.%llx snap issued %s (mask %s)\n",
901
inode, ceph_vinop(inode), ceph_cap_string(have),
902
ceph_cap_string(mask));
903
return 1;
904
}
905
906
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
907
cap = rb_entry(p, struct ceph_cap, ci_node);
908
if (!__cap_is_valid(cap))
909
continue;
910
if ((cap->issued & mask) == mask) {
911
doutc(cl, "mask %p %llx.%llx cap %p issued %s (mask %s)\n",
912
inode, ceph_vinop(inode), cap,
913
ceph_cap_string(cap->issued),
914
ceph_cap_string(mask));
915
if (touch)
916
__touch_cap(cap);
917
return 1;
918
}
919
920
/* does a combination of caps satisfy mask? */
921
have |= cap->issued;
922
if ((have & mask) == mask) {
923
doutc(cl, "mask %p %llx.%llx combo issued %s (mask %s)\n",
924
inode, ceph_vinop(inode),
925
ceph_cap_string(cap->issued),
926
ceph_cap_string(mask));
927
if (touch) {
928
struct rb_node *q;
929
930
/* touch this + preceding caps */
931
__touch_cap(cap);
932
for (q = rb_first(&ci->i_caps); q != p;
933
q = rb_next(q)) {
934
cap = rb_entry(q, struct ceph_cap,
935
ci_node);
936
if (!__cap_is_valid(cap))
937
continue;
938
if (cap->issued & mask)
939
__touch_cap(cap);
940
}
941
}
942
return 1;
943
}
944
}
945
946
return 0;
947
}
948
949
int __ceph_caps_issued_mask_metric(struct ceph_inode_info *ci, int mask,
950
int touch)
951
{
952
struct ceph_fs_client *fsc = ceph_sb_to_fs_client(ci->netfs.inode.i_sb);
953
int r;
954
955
r = __ceph_caps_issued_mask(ci, mask, touch);
956
if (r)
957
ceph_update_cap_hit(&fsc->mdsc->metric);
958
else
959
ceph_update_cap_mis(&fsc->mdsc->metric);
960
return r;
961
}
962
963
/*
964
* Return true if mask caps are currently being revoked by an MDS.
965
*/
966
int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
967
struct ceph_cap *ocap, int mask)
968
{
969
struct ceph_cap *cap;
970
struct rb_node *p;
971
972
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
973
cap = rb_entry(p, struct ceph_cap, ci_node);
974
if (cap != ocap &&
975
(cap->implemented & ~cap->issued & mask))
976
return 1;
977
}
978
return 0;
979
}
980
981
int __ceph_caps_used(struct ceph_inode_info *ci)
982
{
983
int used = 0;
984
if (ci->i_pin_ref)
985
used |= CEPH_CAP_PIN;
986
if (ci->i_rd_ref)
987
used |= CEPH_CAP_FILE_RD;
988
if (ci->i_rdcache_ref ||
989
(S_ISREG(ci->netfs.inode.i_mode) &&
990
ci->netfs.inode.i_data.nrpages))
991
used |= CEPH_CAP_FILE_CACHE;
992
if (ci->i_wr_ref)
993
used |= CEPH_CAP_FILE_WR;
994
if (ci->i_wb_ref || ci->i_wrbuffer_ref)
995
used |= CEPH_CAP_FILE_BUFFER;
996
if (ci->i_fx_ref)
997
used |= CEPH_CAP_FILE_EXCL;
998
return used;
999
}
1000
1001
#define FMODE_WAIT_BIAS 1000
1002
1003
/*
1004
* wanted, by virtue of open file modes
1005
*/
1006
int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
1007
{
1008
const int PIN_SHIFT = ffs(CEPH_FILE_MODE_PIN);
1009
const int RD_SHIFT = ffs(CEPH_FILE_MODE_RD);
1010
const int WR_SHIFT = ffs(CEPH_FILE_MODE_WR);
1011
const int LAZY_SHIFT = ffs(CEPH_FILE_MODE_LAZY);
1012
struct ceph_mount_options *opt =
1013
ceph_inode_to_fs_client(&ci->netfs.inode)->mount_options;
1014
unsigned long used_cutoff = jiffies - opt->caps_wanted_delay_max * HZ;
1015
unsigned long idle_cutoff = jiffies - opt->caps_wanted_delay_min * HZ;
1016
1017
if (S_ISDIR(ci->netfs.inode.i_mode)) {
1018
int want = 0;
1019
1020
/* use used_cutoff here, to keep dir's wanted caps longer */
1021
if (ci->i_nr_by_mode[RD_SHIFT] > 0 ||
1022
time_after(ci->i_last_rd, used_cutoff))
1023
want |= CEPH_CAP_ANY_SHARED;
1024
1025
if (ci->i_nr_by_mode[WR_SHIFT] > 0 ||
1026
time_after(ci->i_last_wr, used_cutoff)) {
1027
want |= CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
1028
if (opt->flags & CEPH_MOUNT_OPT_ASYNC_DIROPS)
1029
want |= CEPH_CAP_ANY_DIR_OPS;
1030
}
1031
1032
if (want || ci->i_nr_by_mode[PIN_SHIFT] > 0)
1033
want |= CEPH_CAP_PIN;
1034
1035
return want;
1036
} else {
1037
int bits = 0;
1038
1039
if (ci->i_nr_by_mode[RD_SHIFT] > 0) {
1040
if (ci->i_nr_by_mode[RD_SHIFT] >= FMODE_WAIT_BIAS ||
1041
time_after(ci->i_last_rd, used_cutoff))
1042
bits |= 1 << RD_SHIFT;
1043
} else if (time_after(ci->i_last_rd, idle_cutoff)) {
1044
bits |= 1 << RD_SHIFT;
1045
}
1046
1047
if (ci->i_nr_by_mode[WR_SHIFT] > 0) {
1048
if (ci->i_nr_by_mode[WR_SHIFT] >= FMODE_WAIT_BIAS ||
1049
time_after(ci->i_last_wr, used_cutoff))
1050
bits |= 1 << WR_SHIFT;
1051
} else if (time_after(ci->i_last_wr, idle_cutoff)) {
1052
bits |= 1 << WR_SHIFT;
1053
}
1054
1055
/* check lazyio only when read/write is wanted */
1056
if ((bits & (CEPH_FILE_MODE_RDWR << 1)) &&
1057
ci->i_nr_by_mode[LAZY_SHIFT] > 0)
1058
bits |= 1 << LAZY_SHIFT;
1059
1060
return bits ? ceph_caps_for_mode(bits >> 1) : 0;
1061
}
1062
}
1063
1064
/*
1065
* wanted, by virtue of open file modes AND cap refs (buffered/cached data)
1066
*/
1067
int __ceph_caps_wanted(struct ceph_inode_info *ci)
1068
{
1069
int w = __ceph_caps_file_wanted(ci) | __ceph_caps_used(ci);
1070
if (S_ISDIR(ci->netfs.inode.i_mode)) {
1071
/* we want EXCL if holding caps of dir ops */
1072
if (w & CEPH_CAP_ANY_DIR_OPS)
1073
w |= CEPH_CAP_FILE_EXCL;
1074
} else {
1075
/* we want EXCL if dirty data */
1076
if (w & CEPH_CAP_FILE_BUFFER)
1077
w |= CEPH_CAP_FILE_EXCL;
1078
}
1079
return w;
1080
}
1081
1082
/*
1083
* Return caps we have registered with the MDS(s) as 'wanted'.
1084
*/
1085
int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check)
1086
{
1087
struct ceph_cap *cap;
1088
struct rb_node *p;
1089
int mds_wanted = 0;
1090
1091
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
1092
cap = rb_entry(p, struct ceph_cap, ci_node);
1093
if (check && !__cap_is_valid(cap))
1094
continue;
1095
if (cap == ci->i_auth_cap)
1096
mds_wanted |= cap->mds_wanted;
1097
else
1098
mds_wanted |= (cap->mds_wanted & ~CEPH_CAP_ANY_FILE_WR);
1099
}
1100
return mds_wanted;
1101
}
1102
1103
int ceph_is_any_caps(struct inode *inode)
1104
{
1105
struct ceph_inode_info *ci = ceph_inode(inode);
1106
int ret;
1107
1108
spin_lock(&ci->i_ceph_lock);
1109
ret = __ceph_is_any_real_caps(ci);
1110
spin_unlock(&ci->i_ceph_lock);
1111
1112
return ret;
1113
}
1114
1115
/*
1116
* Remove a cap. Take steps to deal with a racing iterate_session_caps.
1117
*
1118
* caller should hold i_ceph_lock.
1119
* caller will not hold session s_mutex if called from destroy_inode.
1120
*/
1121
void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
1122
{
1123
struct ceph_mds_session *session = cap->session;
1124
struct ceph_client *cl = session->s_mdsc->fsc->client;
1125
struct ceph_inode_info *ci = cap->ci;
1126
struct inode *inode = &ci->netfs.inode;
1127
struct ceph_mds_client *mdsc;
1128
int removed = 0;
1129
1130
/* 'ci' being NULL means the remove have already occurred */
1131
if (!ci) {
1132
doutc(cl, "inode is NULL\n");
1133
return;
1134
}
1135
1136
lockdep_assert_held(&ci->i_ceph_lock);
1137
1138
doutc(cl, "%p from %p %llx.%llx\n", cap, inode, ceph_vinop(inode));
1139
1140
mdsc = ceph_inode_to_fs_client(&ci->netfs.inode)->mdsc;
1141
1142
/* remove from inode's cap rbtree, and clear auth cap */
1143
rb_erase(&cap->ci_node, &ci->i_caps);
1144
if (ci->i_auth_cap == cap)
1145
ci->i_auth_cap = NULL;
1146
1147
/* remove from session list */
1148
spin_lock(&session->s_cap_lock);
1149
if (session->s_cap_iterator == cap) {
1150
/* not yet, we are iterating over this very cap */
1151
doutc(cl, "delaying %p removal from session %p\n", cap,
1152
cap->session);
1153
} else {
1154
list_del_init(&cap->session_caps);
1155
session->s_nr_caps--;
1156
atomic64_dec(&mdsc->metric.total_caps);
1157
cap->session = NULL;
1158
removed = 1;
1159
}
1160
/* protect backpointer with s_cap_lock: see iterate_session_caps */
1161
cap->ci = NULL;
1162
1163
/*
1164
* s_cap_reconnect is protected by s_cap_lock. no one changes
1165
* s_cap_gen while session is in the reconnect state.
1166
*/
1167
if (queue_release &&
1168
(!session->s_cap_reconnect ||
1169
cap->cap_gen == atomic_read(&session->s_cap_gen))) {
1170
cap->queue_release = 1;
1171
if (removed) {
1172
__ceph_queue_cap_release(session, cap);
1173
removed = 0;
1174
}
1175
} else {
1176
cap->queue_release = 0;
1177
}
1178
cap->cap_ino = ci->i_vino.ino;
1179
1180
spin_unlock(&session->s_cap_lock);
1181
1182
if (removed)
1183
ceph_put_cap(mdsc, cap);
1184
1185
if (!__ceph_is_any_real_caps(ci)) {
1186
/* when reconnect denied, we remove session caps forcibly,
1187
* i_wr_ref can be non-zero. If there are ongoing write,
1188
* keep i_snap_realm.
1189
*/
1190
if (ci->i_wr_ref == 0 && ci->i_snap_realm)
1191
ceph_change_snap_realm(&ci->netfs.inode, NULL);
1192
1193
__cap_delay_cancel(mdsc, ci);
1194
}
1195
}
1196
1197
void ceph_remove_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1198
bool queue_release)
1199
{
1200
struct ceph_inode_info *ci = cap->ci;
1201
struct ceph_fs_client *fsc;
1202
1203
/* 'ci' being NULL means the remove have already occurred */
1204
if (!ci) {
1205
doutc(mdsc->fsc->client, "inode is NULL\n");
1206
return;
1207
}
1208
1209
lockdep_assert_held(&ci->i_ceph_lock);
1210
1211
fsc = ceph_inode_to_fs_client(&ci->netfs.inode);
1212
WARN_ON_ONCE(ci->i_auth_cap == cap &&
1213
!list_empty(&ci->i_dirty_item) &&
1214
!fsc->blocklisted &&
1215
!ceph_inode_is_shutdown(&ci->netfs.inode));
1216
1217
__ceph_remove_cap(cap, queue_release);
1218
}
1219
1220
struct cap_msg_args {
1221
struct ceph_mds_session *session;
1222
u64 ino, cid, follows;
1223
u64 flush_tid, oldest_flush_tid, size, max_size;
1224
u64 xattr_version;
1225
u64 change_attr;
1226
struct ceph_buffer *xattr_buf;
1227
struct ceph_buffer *old_xattr_buf;
1228
struct timespec64 atime, mtime, ctime, btime;
1229
int op, caps, wanted, dirty;
1230
u32 seq, issue_seq, mseq, time_warp_seq;
1231
u32 flags;
1232
kuid_t uid;
1233
kgid_t gid;
1234
umode_t mode;
1235
bool inline_data;
1236
bool wake;
1237
bool encrypted;
1238
u32 fscrypt_auth_len;
1239
u8 fscrypt_auth[sizeof(struct ceph_fscrypt_auth)]; // for context
1240
};
1241
1242
/* Marshal up the cap msg to the MDS */
1243
static void encode_cap_msg(struct ceph_msg *msg, struct cap_msg_args *arg)
1244
{
1245
struct ceph_mds_caps *fc;
1246
void *p;
1247
struct ceph_mds_client *mdsc = arg->session->s_mdsc;
1248
struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
1249
1250
doutc(mdsc->fsc->client,
1251
"%s %llx %llx caps %s wanted %s dirty %s seq %u/%u"
1252
" tid %llu/%llu mseq %u follows %lld size %llu/%llu"
1253
" xattr_ver %llu xattr_len %d\n",
1254
ceph_cap_op_name(arg->op), arg->cid, arg->ino,
1255
ceph_cap_string(arg->caps), ceph_cap_string(arg->wanted),
1256
ceph_cap_string(arg->dirty), arg->seq, arg->issue_seq,
1257
arg->flush_tid, arg->oldest_flush_tid, arg->mseq, arg->follows,
1258
arg->size, arg->max_size, arg->xattr_version,
1259
arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0);
1260
1261
msg->hdr.version = cpu_to_le16(12);
1262
msg->hdr.tid = cpu_to_le64(arg->flush_tid);
1263
1264
fc = msg->front.iov_base;
1265
memset(fc, 0, sizeof(*fc));
1266
1267
fc->cap_id = cpu_to_le64(arg->cid);
1268
fc->op = cpu_to_le32(arg->op);
1269
fc->seq = cpu_to_le32(arg->seq);
1270
fc->issue_seq = cpu_to_le32(arg->issue_seq);
1271
fc->migrate_seq = cpu_to_le32(arg->mseq);
1272
fc->caps = cpu_to_le32(arg->caps);
1273
fc->wanted = cpu_to_le32(arg->wanted);
1274
fc->dirty = cpu_to_le32(arg->dirty);
1275
fc->ino = cpu_to_le64(arg->ino);
1276
fc->snap_follows = cpu_to_le64(arg->follows);
1277
1278
#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
1279
if (arg->encrypted)
1280
fc->size = cpu_to_le64(round_up(arg->size,
1281
CEPH_FSCRYPT_BLOCK_SIZE));
1282
else
1283
#endif
1284
fc->size = cpu_to_le64(arg->size);
1285
fc->max_size = cpu_to_le64(arg->max_size);
1286
ceph_encode_timespec64(&fc->mtime, &arg->mtime);
1287
ceph_encode_timespec64(&fc->atime, &arg->atime);
1288
ceph_encode_timespec64(&fc->ctime, &arg->ctime);
1289
fc->time_warp_seq = cpu_to_le32(arg->time_warp_seq);
1290
1291
fc->uid = cpu_to_le32(from_kuid(&init_user_ns, arg->uid));
1292
fc->gid = cpu_to_le32(from_kgid(&init_user_ns, arg->gid));
1293
fc->mode = cpu_to_le32(arg->mode);
1294
1295
fc->xattr_version = cpu_to_le64(arg->xattr_version);
1296
if (arg->xattr_buf) {
1297
msg->middle = ceph_buffer_get(arg->xattr_buf);
1298
fc->xattr_len = cpu_to_le32(arg->xattr_buf->vec.iov_len);
1299
msg->hdr.middle_len = cpu_to_le32(arg->xattr_buf->vec.iov_len);
1300
}
1301
1302
p = fc + 1;
1303
/* flock buffer size (version 2) */
1304
ceph_encode_32(&p, 0);
1305
/* inline version (version 4) */
1306
ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE);
1307
/* inline data size */
1308
ceph_encode_32(&p, 0);
1309
/*
1310
* osd_epoch_barrier (version 5)
1311
* The epoch_barrier is protected osdc->lock, so READ_ONCE here in
1312
* case it was recently changed
1313
*/
1314
ceph_encode_32(&p, READ_ONCE(osdc->epoch_barrier));
1315
/* oldest_flush_tid (version 6) */
1316
ceph_encode_64(&p, arg->oldest_flush_tid);
1317
1318
/*
1319
* caller_uid/caller_gid (version 7)
1320
*
1321
* Currently, we don't properly track which caller dirtied the caps
1322
* last, and force a flush of them when there is a conflict. For now,
1323
* just set this to 0:0, to emulate how the MDS has worked up to now.
1324
*/
1325
ceph_encode_32(&p, 0);
1326
ceph_encode_32(&p, 0);
1327
1328
/* pool namespace (version 8) (mds always ignores this) */
1329
ceph_encode_32(&p, 0);
1330
1331
/* btime and change_attr (version 9) */
1332
ceph_encode_timespec64(p, &arg->btime);
1333
p += sizeof(struct ceph_timespec);
1334
ceph_encode_64(&p, arg->change_attr);
1335
1336
/* Advisory flags (version 10) */
1337
ceph_encode_32(&p, arg->flags);
1338
1339
/* dirstats (version 11) - these are r/o on the client */
1340
ceph_encode_64(&p, 0);
1341
ceph_encode_64(&p, 0);
1342
1343
#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
1344
/*
1345
* fscrypt_auth and fscrypt_file (version 12)
1346
*
1347
* fscrypt_auth holds the crypto context (if any). fscrypt_file
1348
* tracks the real i_size as an __le64 field (and we use a rounded-up
1349
* i_size in the traditional size field).
1350
*/
1351
ceph_encode_32(&p, arg->fscrypt_auth_len);
1352
ceph_encode_copy(&p, arg->fscrypt_auth, arg->fscrypt_auth_len);
1353
ceph_encode_32(&p, sizeof(__le64));
1354
ceph_encode_64(&p, arg->size);
1355
#else /* CONFIG_FS_ENCRYPTION */
1356
ceph_encode_32(&p, 0);
1357
ceph_encode_32(&p, 0);
1358
#endif /* CONFIG_FS_ENCRYPTION */
1359
}
1360
1361
/*
1362
* Queue cap releases when an inode is dropped from our cache.
1363
*/
1364
void __ceph_remove_caps(struct ceph_inode_info *ci)
1365
{
1366
struct inode *inode = &ci->netfs.inode;
1367
struct ceph_mds_client *mdsc = ceph_inode_to_fs_client(inode)->mdsc;
1368
struct rb_node *p;
1369
1370
/* lock i_ceph_lock, because ceph_d_revalidate(..., LOOKUP_RCU)
1371
* may call __ceph_caps_issued_mask() on a freeing inode. */
1372
spin_lock(&ci->i_ceph_lock);
1373
p = rb_first(&ci->i_caps);
1374
while (p) {
1375
struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
1376
p = rb_next(p);
1377
ceph_remove_cap(mdsc, cap, true);
1378
}
1379
spin_unlock(&ci->i_ceph_lock);
1380
}
1381
1382
/*
1383
* Prepare to send a cap message to an MDS. Update the cap state, and populate
1384
* the arg struct with the parameters that will need to be sent. This should
1385
* be done under the i_ceph_lock to guard against changes to cap state.
1386
*
1387
* Make note of max_size reported/requested from mds, revoked caps
1388
* that have now been implemented.
1389
*/
1390
static void __prep_cap(struct cap_msg_args *arg, struct ceph_cap *cap,
1391
int op, int flags, int used, int want, int retain,
1392
int flushing, u64 flush_tid, u64 oldest_flush_tid)
1393
{
1394
struct ceph_inode_info *ci = cap->ci;
1395
struct inode *inode = &ci->netfs.inode;
1396
struct ceph_client *cl = ceph_inode_to_client(inode);
1397
int held, revoking;
1398
1399
lockdep_assert_held(&ci->i_ceph_lock);
1400
1401
held = cap->issued | cap->implemented;
1402
revoking = cap->implemented & ~cap->issued;
1403
retain &= ~revoking;
1404
1405
doutc(cl, "%p %llx.%llx cap %p session %p %s -> %s (revoking %s)\n",
1406
inode, ceph_vinop(inode), cap, cap->session,
1407
ceph_cap_string(held), ceph_cap_string(held & retain),
1408
ceph_cap_string(revoking));
1409
BUG_ON((retain & CEPH_CAP_PIN) == 0);
1410
1411
ci->i_ceph_flags &= ~CEPH_I_FLUSH;
1412
1413
cap->issued &= retain; /* drop bits we don't want */
1414
/*
1415
* Wake up any waiters on wanted -> needed transition. This is due to
1416
* the weird transition from buffered to sync IO... we need to flush
1417
* dirty pages _before_ allowing sync writes to avoid reordering.
1418
*/
1419
arg->wake = cap->implemented & ~cap->issued;
1420
cap->implemented &= cap->issued | used;
1421
cap->mds_wanted = want;
1422
1423
arg->session = cap->session;
1424
arg->ino = ceph_vino(inode).ino;
1425
arg->cid = cap->cap_id;
1426
arg->follows = flushing ? ci->i_head_snapc->seq : 0;
1427
arg->flush_tid = flush_tid;
1428
arg->oldest_flush_tid = oldest_flush_tid;
1429
arg->size = i_size_read(inode);
1430
ci->i_reported_size = arg->size;
1431
arg->max_size = ci->i_wanted_max_size;
1432
if (cap == ci->i_auth_cap) {
1433
if (want & CEPH_CAP_ANY_FILE_WR)
1434
ci->i_requested_max_size = arg->max_size;
1435
else
1436
ci->i_requested_max_size = 0;
1437
}
1438
1439
if (flushing & CEPH_CAP_XATTR_EXCL) {
1440
arg->old_xattr_buf = __ceph_build_xattrs_blob(ci);
1441
arg->xattr_version = ci->i_xattrs.version;
1442
arg->xattr_buf = ceph_buffer_get(ci->i_xattrs.blob);
1443
} else {
1444
arg->xattr_buf = NULL;
1445
arg->old_xattr_buf = NULL;
1446
}
1447
1448
arg->mtime = inode_get_mtime(inode);
1449
arg->atime = inode_get_atime(inode);
1450
arg->ctime = inode_get_ctime(inode);
1451
arg->btime = ci->i_btime;
1452
arg->change_attr = inode_peek_iversion_raw(inode);
1453
1454
arg->op = op;
1455
arg->caps = cap->implemented;
1456
arg->wanted = want;
1457
arg->dirty = flushing;
1458
1459
arg->seq = cap->seq;
1460
arg->issue_seq = cap->issue_seq;
1461
arg->mseq = cap->mseq;
1462
arg->time_warp_seq = ci->i_time_warp_seq;
1463
1464
arg->uid = inode->i_uid;
1465
arg->gid = inode->i_gid;
1466
arg->mode = inode->i_mode;
1467
1468
arg->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
1469
if (!(flags & CEPH_CLIENT_CAPS_PENDING_CAPSNAP) &&
1470
!list_empty(&ci->i_cap_snaps)) {
1471
struct ceph_cap_snap *capsnap;
1472
list_for_each_entry_reverse(capsnap, &ci->i_cap_snaps, ci_item) {
1473
if (capsnap->cap_flush.tid)
1474
break;
1475
if (capsnap->need_flush) {
1476
flags |= CEPH_CLIENT_CAPS_PENDING_CAPSNAP;
1477
break;
1478
}
1479
}
1480
}
1481
arg->flags = flags;
1482
arg->encrypted = IS_ENCRYPTED(inode);
1483
#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
1484
if (ci->fscrypt_auth_len &&
1485
WARN_ON_ONCE(ci->fscrypt_auth_len > sizeof(struct ceph_fscrypt_auth))) {
1486
/* Don't set this if it's too big */
1487
arg->fscrypt_auth_len = 0;
1488
} else {
1489
arg->fscrypt_auth_len = ci->fscrypt_auth_len;
1490
memcpy(arg->fscrypt_auth, ci->fscrypt_auth,
1491
min_t(size_t, ci->fscrypt_auth_len,
1492
sizeof(arg->fscrypt_auth)));
1493
}
1494
#endif /* CONFIG_FS_ENCRYPTION */
1495
}
1496
1497
#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
1498
#define CAP_MSG_FIXED_FIELDS (sizeof(struct ceph_mds_caps) + \
1499
4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + 8 + 4 + 4 + 8)
1500
1501
static inline int cap_msg_size(struct cap_msg_args *arg)
1502
{
1503
return CAP_MSG_FIXED_FIELDS + arg->fscrypt_auth_len;
1504
}
1505
#else
1506
#define CAP_MSG_FIXED_FIELDS (sizeof(struct ceph_mds_caps) + \
1507
4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + 8 + 4 + 4)
1508
1509
static inline int cap_msg_size(struct cap_msg_args *arg)
1510
{
1511
return CAP_MSG_FIXED_FIELDS;
1512
}
1513
#endif /* CONFIG_FS_ENCRYPTION */
1514
1515
/*
1516
* Send a cap msg on the given inode.
1517
*
1518
* Caller should hold snap_rwsem (read), s_mutex.
1519
*/
1520
static void __send_cap(struct cap_msg_args *arg, struct ceph_inode_info *ci)
1521
{
1522
struct ceph_msg *msg;
1523
struct inode *inode = &ci->netfs.inode;
1524
struct ceph_client *cl = ceph_inode_to_client(inode);
1525
1526
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, cap_msg_size(arg), GFP_NOFS,
1527
false);
1528
if (!msg) {
1529
pr_err_client(cl,
1530
"error allocating cap msg: ino (%llx.%llx)"
1531
" flushing %s tid %llu, requeuing cap.\n",
1532
ceph_vinop(inode), ceph_cap_string(arg->dirty),
1533
arg->flush_tid);
1534
spin_lock(&ci->i_ceph_lock);
1535
__cap_delay_requeue(arg->session->s_mdsc, ci);
1536
spin_unlock(&ci->i_ceph_lock);
1537
return;
1538
}
1539
1540
encode_cap_msg(msg, arg);
1541
ceph_con_send(&arg->session->s_con, msg);
1542
ceph_buffer_put(arg->old_xattr_buf);
1543
ceph_buffer_put(arg->xattr_buf);
1544
if (arg->wake)
1545
wake_up_all(&ci->i_cap_wq);
1546
}
1547
1548
static inline int __send_flush_snap(struct inode *inode,
1549
struct ceph_mds_session *session,
1550
struct ceph_cap_snap *capsnap,
1551
u32 mseq, u64 oldest_flush_tid)
1552
{
1553
struct cap_msg_args arg;
1554
struct ceph_msg *msg;
1555
1556
arg.session = session;
1557
arg.ino = ceph_vino(inode).ino;
1558
arg.cid = 0;
1559
arg.follows = capsnap->follows;
1560
arg.flush_tid = capsnap->cap_flush.tid;
1561
arg.oldest_flush_tid = oldest_flush_tid;
1562
1563
arg.size = capsnap->size;
1564
arg.max_size = 0;
1565
arg.xattr_version = capsnap->xattr_version;
1566
arg.xattr_buf = capsnap->xattr_blob;
1567
arg.old_xattr_buf = NULL;
1568
1569
arg.atime = capsnap->atime;
1570
arg.mtime = capsnap->mtime;
1571
arg.ctime = capsnap->ctime;
1572
arg.btime = capsnap->btime;
1573
arg.change_attr = capsnap->change_attr;
1574
1575
arg.op = CEPH_CAP_OP_FLUSHSNAP;
1576
arg.caps = capsnap->issued;
1577
arg.wanted = 0;
1578
arg.dirty = capsnap->dirty;
1579
1580
arg.seq = 0;
1581
arg.issue_seq = 0;
1582
arg.mseq = mseq;
1583
arg.time_warp_seq = capsnap->time_warp_seq;
1584
1585
arg.uid = capsnap->uid;
1586
arg.gid = capsnap->gid;
1587
arg.mode = capsnap->mode;
1588
1589
arg.inline_data = capsnap->inline_data;
1590
arg.flags = 0;
1591
arg.wake = false;
1592
arg.encrypted = IS_ENCRYPTED(inode);
1593
1594
/* No fscrypt_auth changes from a capsnap.*/
1595
arg.fscrypt_auth_len = 0;
1596
1597
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, cap_msg_size(&arg),
1598
GFP_NOFS, false);
1599
if (!msg)
1600
return -ENOMEM;
1601
1602
encode_cap_msg(msg, &arg);
1603
ceph_con_send(&arg.session->s_con, msg);
1604
return 0;
1605
}
1606
1607
/*
1608
* When a snapshot is taken, clients accumulate dirty metadata on
1609
* inodes with capabilities in ceph_cap_snaps to describe the file
1610
* state at the time the snapshot was taken. This must be flushed
1611
* asynchronously back to the MDS once sync writes complete and dirty
1612
* data is written out.
1613
*
1614
* Called under i_ceph_lock.
1615
*/
1616
static void __ceph_flush_snaps(struct ceph_inode_info *ci,
1617
struct ceph_mds_session *session)
1618
__releases(ci->i_ceph_lock)
1619
__acquires(ci->i_ceph_lock)
1620
{
1621
struct inode *inode = &ci->netfs.inode;
1622
struct ceph_mds_client *mdsc = session->s_mdsc;
1623
struct ceph_client *cl = mdsc->fsc->client;
1624
struct ceph_cap_snap *capsnap;
1625
u64 oldest_flush_tid = 0;
1626
u64 first_tid = 1, last_tid = 0;
1627
1628
doutc(cl, "%p %llx.%llx session %p\n", inode, ceph_vinop(inode),
1629
session);
1630
1631
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
1632
/*
1633
* we need to wait for sync writes to complete and for dirty
1634
* pages to be written out.
1635
*/
1636
if (capsnap->dirty_pages || capsnap->writing)
1637
break;
1638
1639
/* should be removed by ceph_try_drop_cap_snap() */
1640
BUG_ON(!capsnap->need_flush);
1641
1642
/* only flush each capsnap once */
1643
if (capsnap->cap_flush.tid > 0) {
1644
doutc(cl, "already flushed %p, skipping\n", capsnap);
1645
continue;
1646
}
1647
1648
spin_lock(&mdsc->cap_dirty_lock);
1649
capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid;
1650
list_add_tail(&capsnap->cap_flush.g_list,
1651
&mdsc->cap_flush_list);
1652
if (oldest_flush_tid == 0)
1653
oldest_flush_tid = __get_oldest_flush_tid(mdsc);
1654
if (list_empty(&ci->i_flushing_item)) {
1655
list_add_tail(&ci->i_flushing_item,
1656
&session->s_cap_flushing);
1657
}
1658
spin_unlock(&mdsc->cap_dirty_lock);
1659
1660
list_add_tail(&capsnap->cap_flush.i_list,
1661
&ci->i_cap_flush_list);
1662
1663
if (first_tid == 1)
1664
first_tid = capsnap->cap_flush.tid;
1665
last_tid = capsnap->cap_flush.tid;
1666
}
1667
1668
ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS;
1669
1670
while (first_tid <= last_tid) {
1671
struct ceph_cap *cap = ci->i_auth_cap;
1672
struct ceph_cap_flush *cf = NULL, *iter;
1673
int ret;
1674
1675
if (!(cap && cap->session == session)) {
1676
doutc(cl, "%p %llx.%llx auth cap %p not mds%d, stop\n",
1677
inode, ceph_vinop(inode), cap, session->s_mds);
1678
break;
1679
}
1680
1681
ret = -ENOENT;
1682
list_for_each_entry(iter, &ci->i_cap_flush_list, i_list) {
1683
if (iter->tid >= first_tid) {
1684
cf = iter;
1685
ret = 0;
1686
break;
1687
}
1688
}
1689
if (ret < 0)
1690
break;
1691
1692
first_tid = cf->tid + 1;
1693
1694
capsnap = container_of(cf, struct ceph_cap_snap, cap_flush);
1695
refcount_inc(&capsnap->nref);
1696
spin_unlock(&ci->i_ceph_lock);
1697
1698
doutc(cl, "%p %llx.%llx capsnap %p tid %llu %s\n", inode,
1699
ceph_vinop(inode), capsnap, cf->tid,
1700
ceph_cap_string(capsnap->dirty));
1701
1702
ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
1703
oldest_flush_tid);
1704
if (ret < 0) {
1705
pr_err_client(cl, "error sending cap flushsnap, "
1706
"ino (%llx.%llx) tid %llu follows %llu\n",
1707
ceph_vinop(inode), cf->tid,
1708
capsnap->follows);
1709
}
1710
1711
ceph_put_cap_snap(capsnap);
1712
spin_lock(&ci->i_ceph_lock);
1713
}
1714
}
1715
1716
void ceph_flush_snaps(struct ceph_inode_info *ci,
1717
struct ceph_mds_session **psession)
1718
{
1719
struct inode *inode = &ci->netfs.inode;
1720
struct ceph_mds_client *mdsc = ceph_inode_to_fs_client(inode)->mdsc;
1721
struct ceph_client *cl = ceph_inode_to_client(inode);
1722
struct ceph_mds_session *session = NULL;
1723
bool need_put = false;
1724
int mds;
1725
1726
doutc(cl, "%p %llx.%llx\n", inode, ceph_vinop(inode));
1727
if (psession)
1728
session = *psession;
1729
retry:
1730
spin_lock(&ci->i_ceph_lock);
1731
if (!(ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)) {
1732
doutc(cl, " no capsnap needs flush, doing nothing\n");
1733
goto out;
1734
}
1735
if (!ci->i_auth_cap) {
1736
doutc(cl, " no auth cap (migrating?), doing nothing\n");
1737
goto out;
1738
}
1739
1740
mds = ci->i_auth_cap->session->s_mds;
1741
if (session && session->s_mds != mds) {
1742
doutc(cl, " oops, wrong session %p mutex\n", session);
1743
ceph_put_mds_session(session);
1744
session = NULL;
1745
}
1746
if (!session) {
1747
spin_unlock(&ci->i_ceph_lock);
1748
mutex_lock(&mdsc->mutex);
1749
session = __ceph_lookup_mds_session(mdsc, mds);
1750
mutex_unlock(&mdsc->mutex);
1751
goto retry;
1752
}
1753
1754
// make sure flushsnap messages are sent in proper order.
1755
if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
1756
__kick_flushing_caps(mdsc, session, ci, 0);
1757
1758
__ceph_flush_snaps(ci, session);
1759
out:
1760
spin_unlock(&ci->i_ceph_lock);
1761
1762
if (psession)
1763
*psession = session;
1764
else
1765
ceph_put_mds_session(session);
1766
/* we flushed them all; remove this inode from the queue */
1767
spin_lock(&mdsc->snap_flush_lock);
1768
if (!list_empty(&ci->i_snap_flush_item))
1769
need_put = true;
1770
list_del_init(&ci->i_snap_flush_item);
1771
spin_unlock(&mdsc->snap_flush_lock);
1772
1773
if (need_put)
1774
iput(inode);
1775
}
1776
1777
/*
1778
* Mark caps dirty. If inode is newly dirty, return the dirty flags.
1779
* Caller is then responsible for calling __mark_inode_dirty with the
1780
* returned flags value.
1781
*/
1782
int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
1783
struct ceph_cap_flush **pcf)
1784
{
1785
struct ceph_mds_client *mdsc =
1786
ceph_sb_to_fs_client(ci->netfs.inode.i_sb)->mdsc;
1787
struct inode *inode = &ci->netfs.inode;
1788
struct ceph_client *cl = ceph_inode_to_client(inode);
1789
int was = ci->i_dirty_caps;
1790
int dirty = 0;
1791
1792
lockdep_assert_held(&ci->i_ceph_lock);
1793
1794
if (!ci->i_auth_cap) {
1795
pr_warn_client(cl, "%p %llx.%llx mask %s, "
1796
"but no auth cap (session was closed?)\n",
1797
inode, ceph_vinop(inode),
1798
ceph_cap_string(mask));
1799
return 0;
1800
}
1801
1802
doutc(cl, "%p %llx.%llx %s dirty %s -> %s\n", inode,
1803
ceph_vinop(inode), ceph_cap_string(mask),
1804
ceph_cap_string(was), ceph_cap_string(was | mask));
1805
ci->i_dirty_caps |= mask;
1806
if (was == 0) {
1807
struct ceph_mds_session *session = ci->i_auth_cap->session;
1808
1809
WARN_ON_ONCE(ci->i_prealloc_cap_flush);
1810
swap(ci->i_prealloc_cap_flush, *pcf);
1811
1812
if (!ci->i_head_snapc) {
1813
WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem));
1814
ci->i_head_snapc = ceph_get_snap_context(
1815
ci->i_snap_realm->cached_context);
1816
}
1817
doutc(cl, "%p %llx.%llx now dirty snapc %p auth cap %p\n",
1818
inode, ceph_vinop(inode), ci->i_head_snapc,
1819
ci->i_auth_cap);
1820
BUG_ON(!list_empty(&ci->i_dirty_item));
1821
spin_lock(&mdsc->cap_dirty_lock);
1822
list_add(&ci->i_dirty_item, &session->s_cap_dirty);
1823
spin_unlock(&mdsc->cap_dirty_lock);
1824
if (ci->i_flushing_caps == 0) {
1825
ihold(inode);
1826
dirty |= I_DIRTY_SYNC;
1827
}
1828
} else {
1829
WARN_ON_ONCE(!ci->i_prealloc_cap_flush);
1830
}
1831
BUG_ON(list_empty(&ci->i_dirty_item));
1832
if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) &&
1833
(mask & CEPH_CAP_FILE_BUFFER))
1834
dirty |= I_DIRTY_DATASYNC;
1835
__cap_delay_requeue(mdsc, ci);
1836
return dirty;
1837
}
1838
1839
struct ceph_cap_flush *ceph_alloc_cap_flush(void)
1840
{
1841
struct ceph_cap_flush *cf;
1842
1843
cf = kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL);
1844
if (!cf)
1845
return NULL;
1846
1847
cf->is_capsnap = false;
1848
return cf;
1849
}
1850
1851
void ceph_free_cap_flush(struct ceph_cap_flush *cf)
1852
{
1853
if (cf)
1854
kmem_cache_free(ceph_cap_flush_cachep, cf);
1855
}
1856
1857
static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
1858
{
1859
if (!list_empty(&mdsc->cap_flush_list)) {
1860
struct ceph_cap_flush *cf =
1861
list_first_entry(&mdsc->cap_flush_list,
1862
struct ceph_cap_flush, g_list);
1863
return cf->tid;
1864
}
1865
return 0;
1866
}
1867
1868
/*
1869
* Remove cap_flush from the mdsc's or inode's flushing cap list.
1870
* Return true if caller needs to wake up flush waiters.
1871
*/
1872
static bool __detach_cap_flush_from_mdsc(struct ceph_mds_client *mdsc,
1873
struct ceph_cap_flush *cf)
1874
{
1875
struct ceph_cap_flush *prev;
1876
bool wake = cf->wake;
1877
1878
if (wake && cf->g_list.prev != &mdsc->cap_flush_list) {
1879
prev = list_prev_entry(cf, g_list);
1880
prev->wake = true;
1881
wake = false;
1882
}
1883
list_del_init(&cf->g_list);
1884
return wake;
1885
}
1886
1887
static bool __detach_cap_flush_from_ci(struct ceph_inode_info *ci,
1888
struct ceph_cap_flush *cf)
1889
{
1890
struct ceph_cap_flush *prev;
1891
bool wake = cf->wake;
1892
1893
if (wake && cf->i_list.prev != &ci->i_cap_flush_list) {
1894
prev = list_prev_entry(cf, i_list);
1895
prev->wake = true;
1896
wake = false;
1897
}
1898
list_del_init(&cf->i_list);
1899
return wake;
1900
}
1901
1902
/*
1903
* Add dirty inode to the flushing list. Assigned a seq number so we
1904
* can wait for caps to flush without starving.
1905
*
1906
* Called under i_ceph_lock. Returns the flush tid.
1907
*/
1908
static u64 __mark_caps_flushing(struct inode *inode,
1909
struct ceph_mds_session *session, bool wake,
1910
u64 *oldest_flush_tid)
1911
{
1912
struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(inode->i_sb)->mdsc;
1913
struct ceph_client *cl = ceph_inode_to_client(inode);
1914
struct ceph_inode_info *ci = ceph_inode(inode);
1915
struct ceph_cap_flush *cf = NULL;
1916
int flushing;
1917
1918
lockdep_assert_held(&ci->i_ceph_lock);
1919
BUG_ON(ci->i_dirty_caps == 0);
1920
BUG_ON(list_empty(&ci->i_dirty_item));
1921
BUG_ON(!ci->i_prealloc_cap_flush);
1922
1923
flushing = ci->i_dirty_caps;
1924
doutc(cl, "flushing %s, flushing_caps %s -> %s\n",
1925
ceph_cap_string(flushing),
1926
ceph_cap_string(ci->i_flushing_caps),
1927
ceph_cap_string(ci->i_flushing_caps | flushing));
1928
ci->i_flushing_caps |= flushing;
1929
ci->i_dirty_caps = 0;
1930
doutc(cl, "%p %llx.%llx now !dirty\n", inode, ceph_vinop(inode));
1931
1932
swap(cf, ci->i_prealloc_cap_flush);
1933
cf->caps = flushing;
1934
cf->wake = wake;
1935
1936
spin_lock(&mdsc->cap_dirty_lock);
1937
list_del_init(&ci->i_dirty_item);
1938
1939
cf->tid = ++mdsc->last_cap_flush_tid;
1940
list_add_tail(&cf->g_list, &mdsc->cap_flush_list);
1941
*oldest_flush_tid = __get_oldest_flush_tid(mdsc);
1942
1943
if (list_empty(&ci->i_flushing_item)) {
1944
list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
1945
mdsc->num_cap_flushing++;
1946
}
1947
spin_unlock(&mdsc->cap_dirty_lock);
1948
1949
list_add_tail(&cf->i_list, &ci->i_cap_flush_list);
1950
1951
return cf->tid;
1952
}
1953
1954
/*
1955
* try to invalidate mapping pages without blocking.
1956
*/
1957
static int try_nonblocking_invalidate(struct inode *inode)
1958
__releases(ci->i_ceph_lock)
1959
__acquires(ci->i_ceph_lock)
1960
{
1961
struct ceph_client *cl = ceph_inode_to_client(inode);
1962
struct ceph_inode_info *ci = ceph_inode(inode);
1963
u32 invalidating_gen = ci->i_rdcache_gen;
1964
1965
spin_unlock(&ci->i_ceph_lock);
1966
ceph_fscache_invalidate(inode, false);
1967
invalidate_mapping_pages(&inode->i_data, 0, -1);
1968
spin_lock(&ci->i_ceph_lock);
1969
1970
if (inode->i_data.nrpages == 0 &&
1971
invalidating_gen == ci->i_rdcache_gen) {
1972
/* success. */
1973
doutc(cl, "%p %llx.%llx success\n", inode,
1974
ceph_vinop(inode));
1975
/* save any racing async invalidate some trouble */
1976
ci->i_rdcache_revoking = ci->i_rdcache_gen - 1;
1977
return 0;
1978
}
1979
doutc(cl, "%p %llx.%llx failed\n", inode, ceph_vinop(inode));
1980
return -1;
1981
}
1982
1983
bool __ceph_should_report_size(struct ceph_inode_info *ci)
1984
{
1985
loff_t size = i_size_read(&ci->netfs.inode);
1986
/* mds will adjust max size according to the reported size */
1987
if (ci->i_flushing_caps & CEPH_CAP_FILE_WR)
1988
return false;
1989
if (size >= ci->i_max_size)
1990
return true;
1991
/* half of previous max_size increment has been used */
1992
if (ci->i_max_size > ci->i_reported_size &&
1993
(size << 1) >= ci->i_max_size + ci->i_reported_size)
1994
return true;
1995
return false;
1996
}
1997
1998
/*
1999
* Swiss army knife function to examine currently used and wanted
2000
* versus held caps. Release, flush, ack revoked caps to mds as
2001
* appropriate.
2002
*
2003
* CHECK_CAPS_AUTHONLY - we should only check the auth cap
2004
* CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without
2005
* further delay.
2006
* CHECK_CAPS_FLUSH_FORCE - we should flush any caps immediately, without
2007
* further delay.
2008
*/
2009
void ceph_check_caps(struct ceph_inode_info *ci, int flags)
2010
{
2011
struct inode *inode = &ci->netfs.inode;
2012
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
2013
struct ceph_client *cl = ceph_inode_to_client(inode);
2014
struct ceph_cap *cap;
2015
u64 flush_tid, oldest_flush_tid;
2016
int file_wanted, used, cap_used;
2017
int issued, implemented, want, retain, revoking, flushing = 0;
2018
int mds = -1; /* keep track of how far we've gone through i_caps list
2019
to avoid an infinite loop on retry */
2020
struct rb_node *p;
2021
bool queue_invalidate = false;
2022
bool tried_invalidate = false;
2023
bool queue_writeback = false;
2024
struct ceph_mds_session *session = NULL;
2025
2026
spin_lock(&ci->i_ceph_lock);
2027
if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
2028
ci->i_ceph_flags |= CEPH_I_ASYNC_CHECK_CAPS;
2029
2030
/* Don't send messages until we get async create reply */
2031
spin_unlock(&ci->i_ceph_lock);
2032
return;
2033
}
2034
2035
if (ci->i_ceph_flags & CEPH_I_FLUSH)
2036
flags |= CHECK_CAPS_FLUSH;
2037
retry:
2038
/* Caps wanted by virtue of active open files. */
2039
file_wanted = __ceph_caps_file_wanted(ci);
2040
2041
/* Caps which have active references against them */
2042
used = __ceph_caps_used(ci);
2043
2044
/*
2045
* "issued" represents the current caps that the MDS wants us to have.
2046
* "implemented" is the set that we have been granted, and includes the
2047
* ones that have not yet been returned to the MDS (the "revoking" set,
2048
* usually because they have outstanding references).
2049
*/
2050
issued = __ceph_caps_issued(ci, &implemented);
2051
revoking = implemented & ~issued;
2052
2053
want = file_wanted;
2054
2055
/* The ones we currently want to retain (may be adjusted below) */
2056
retain = file_wanted | used | CEPH_CAP_PIN;
2057
if (!mdsc->stopping && inode->i_nlink > 0) {
2058
if (file_wanted) {
2059
retain |= CEPH_CAP_ANY; /* be greedy */
2060
} else if (S_ISDIR(inode->i_mode) &&
2061
(issued & CEPH_CAP_FILE_SHARED) &&
2062
__ceph_dir_is_complete(ci)) {
2063
/*
2064
* If a directory is complete, we want to keep
2065
* the exclusive cap. So that MDS does not end up
2066
* revoking the shared cap on every create/unlink
2067
* operation.
2068
*/
2069
if (IS_RDONLY(inode)) {
2070
want = CEPH_CAP_ANY_SHARED;
2071
} else {
2072
want |= CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
2073
}
2074
retain |= want;
2075
} else {
2076
2077
retain |= CEPH_CAP_ANY_SHARED;
2078
/*
2079
* keep RD only if we didn't have the file open RW,
2080
* because then the mds would revoke it anyway to
2081
* journal max_size=0.
2082
*/
2083
if (ci->i_max_size == 0)
2084
retain |= CEPH_CAP_ANY_RD;
2085
}
2086
}
2087
2088
doutc(cl, "%p %llx.%llx file_want %s used %s dirty %s "
2089
"flushing %s issued %s revoking %s retain %s %s%s%s%s\n",
2090
inode, ceph_vinop(inode), ceph_cap_string(file_wanted),
2091
ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps),
2092
ceph_cap_string(ci->i_flushing_caps),
2093
ceph_cap_string(issued), ceph_cap_string(revoking),
2094
ceph_cap_string(retain),
2095
(flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "",
2096
(flags & CHECK_CAPS_FLUSH) ? " FLUSH" : "",
2097
(flags & CHECK_CAPS_NOINVAL) ? " NOINVAL" : "",
2098
(flags & CHECK_CAPS_FLUSH_FORCE) ? " FLUSH_FORCE" : "");
2099
2100
/*
2101
* If we no longer need to hold onto old our caps, and we may
2102
* have cached pages, but don't want them, then try to invalidate.
2103
* If we fail, it's because pages are locked.... try again later.
2104
*/
2105
if ((!(flags & CHECK_CAPS_NOINVAL) || mdsc->stopping) &&
2106
S_ISREG(inode->i_mode) &&
2107
!(ci->i_wb_ref || ci->i_wrbuffer_ref) && /* no dirty pages... */
2108
inode->i_data.nrpages && /* have cached pages */
2109
(revoking & (CEPH_CAP_FILE_CACHE|
2110
CEPH_CAP_FILE_LAZYIO)) && /* or revoking cache */
2111
!tried_invalidate) {
2112
doutc(cl, "trying to invalidate on %p %llx.%llx\n",
2113
inode, ceph_vinop(inode));
2114
if (try_nonblocking_invalidate(inode) < 0) {
2115
doutc(cl, "queuing invalidate\n");
2116
queue_invalidate = true;
2117
ci->i_rdcache_revoking = ci->i_rdcache_gen;
2118
}
2119
tried_invalidate = true;
2120
goto retry;
2121
}
2122
2123
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
2124
int mflags = 0;
2125
struct cap_msg_args arg;
2126
2127
cap = rb_entry(p, struct ceph_cap, ci_node);
2128
2129
/* avoid looping forever */
2130
if (mds >= cap->mds ||
2131
((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap))
2132
continue;
2133
2134
/*
2135
* If we have an auth cap, we don't need to consider any
2136
* overlapping caps as used.
2137
*/
2138
cap_used = used;
2139
if (ci->i_auth_cap && cap != ci->i_auth_cap)
2140
cap_used &= ~ci->i_auth_cap->issued;
2141
2142
revoking = cap->implemented & ~cap->issued;
2143
doutc(cl, " mds%d cap %p used %s issued %s implemented %s revoking %s\n",
2144
cap->mds, cap, ceph_cap_string(cap_used),
2145
ceph_cap_string(cap->issued),
2146
ceph_cap_string(cap->implemented),
2147
ceph_cap_string(revoking));
2148
2149
/* completed revocation? going down and there are no caps? */
2150
if (revoking) {
2151
if ((revoking & cap_used) == 0) {
2152
doutc(cl, "completed revocation of %s\n",
2153
ceph_cap_string(cap->implemented & ~cap->issued));
2154
goto ack;
2155
}
2156
2157
/*
2158
* If the "i_wrbuffer_ref" was increased by mmap or generic
2159
* cache write just before the ceph_check_caps() is called,
2160
* the Fb capability revoking will fail this time. Then we
2161
* must wait for the BDI's delayed work to flush the dirty
2162
* pages and to release the "i_wrbuffer_ref", which will cost
2163
* at most 5 seconds. That means the MDS needs to wait at
2164
* most 5 seconds to finished the Fb capability's revocation.
2165
*
2166
* Let's queue a writeback for it.
2167
*/
2168
if (S_ISREG(inode->i_mode) && ci->i_wrbuffer_ref &&
2169
(revoking & CEPH_CAP_FILE_BUFFER))
2170
queue_writeback = true;
2171
}
2172
2173
if (flags & CHECK_CAPS_FLUSH_FORCE) {
2174
doutc(cl, "force to flush caps\n");
2175
goto ack;
2176
}
2177
2178
if (cap == ci->i_auth_cap &&
2179
(cap->issued & CEPH_CAP_FILE_WR)) {
2180
/* request larger max_size from MDS? */
2181
if (ci->i_wanted_max_size > ci->i_max_size &&
2182
ci->i_wanted_max_size > ci->i_requested_max_size) {
2183
doutc(cl, "requesting new max_size\n");
2184
goto ack;
2185
}
2186
2187
/* approaching file_max? */
2188
if (__ceph_should_report_size(ci)) {
2189
doutc(cl, "i_size approaching max_size\n");
2190
goto ack;
2191
}
2192
}
2193
/* flush anything dirty? */
2194
if (cap == ci->i_auth_cap) {
2195
if ((flags & CHECK_CAPS_FLUSH) && ci->i_dirty_caps) {
2196
doutc(cl, "flushing dirty caps\n");
2197
goto ack;
2198
}
2199
if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) {
2200
doutc(cl, "flushing snap caps\n");
2201
goto ack;
2202
}
2203
}
2204
2205
/* want more caps from mds? */
2206
if (want & ~cap->mds_wanted) {
2207
if (want & ~(cap->mds_wanted | cap->issued))
2208
goto ack;
2209
if (!__cap_is_valid(cap))
2210
goto ack;
2211
}
2212
2213
/* things we might delay */
2214
if ((cap->issued & ~retain) == 0)
2215
continue; /* nope, all good */
2216
2217
ack:
2218
ceph_put_mds_session(session);
2219
session = ceph_get_mds_session(cap->session);
2220
2221
/* kick flushing and flush snaps before sending normal
2222
* cap message */
2223
if (cap == ci->i_auth_cap &&
2224
(ci->i_ceph_flags &
2225
(CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) {
2226
if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
2227
__kick_flushing_caps(mdsc, session, ci, 0);
2228
if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
2229
__ceph_flush_snaps(ci, session);
2230
2231
goto retry;
2232
}
2233
2234
if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
2235
flushing = ci->i_dirty_caps;
2236
flush_tid = __mark_caps_flushing(inode, session, false,
2237
&oldest_flush_tid);
2238
if (flags & CHECK_CAPS_FLUSH &&
2239
list_empty(&session->s_cap_dirty))
2240
mflags |= CEPH_CLIENT_CAPS_SYNC;
2241
} else {
2242
flushing = 0;
2243
flush_tid = 0;
2244
spin_lock(&mdsc->cap_dirty_lock);
2245
oldest_flush_tid = __get_oldest_flush_tid(mdsc);
2246
spin_unlock(&mdsc->cap_dirty_lock);
2247
}
2248
2249
mds = cap->mds; /* remember mds, so we don't repeat */
2250
2251
__prep_cap(&arg, cap, CEPH_CAP_OP_UPDATE, mflags, cap_used,
2252
want, retain, flushing, flush_tid, oldest_flush_tid);
2253
2254
spin_unlock(&ci->i_ceph_lock);
2255
__send_cap(&arg, ci);
2256
spin_lock(&ci->i_ceph_lock);
2257
2258
goto retry; /* retake i_ceph_lock and restart our cap scan. */
2259
}
2260
2261
/* periodically re-calculate caps wanted by open files */
2262
if (__ceph_is_any_real_caps(ci) &&
2263
list_empty(&ci->i_cap_delay_list) &&
2264
(file_wanted & ~CEPH_CAP_PIN) &&
2265
!(used & (CEPH_CAP_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
2266
__cap_delay_requeue(mdsc, ci);
2267
}
2268
2269
spin_unlock(&ci->i_ceph_lock);
2270
2271
ceph_put_mds_session(session);
2272
if (queue_writeback)
2273
ceph_queue_writeback(inode);
2274
if (queue_invalidate)
2275
ceph_queue_invalidate(inode);
2276
}
2277
2278
/*
2279
* Try to flush dirty caps back to the auth mds.
2280
*/
2281
static int try_flush_caps(struct inode *inode, u64 *ptid)
2282
{
2283
struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(inode->i_sb)->mdsc;
2284
struct ceph_inode_info *ci = ceph_inode(inode);
2285
int flushing = 0;
2286
u64 flush_tid = 0, oldest_flush_tid = 0;
2287
2288
spin_lock(&ci->i_ceph_lock);
2289
retry_locked:
2290
if (ci->i_dirty_caps && ci->i_auth_cap) {
2291
struct ceph_cap *cap = ci->i_auth_cap;
2292
struct cap_msg_args arg;
2293
struct ceph_mds_session *session = cap->session;
2294
2295
if (session->s_state < CEPH_MDS_SESSION_OPEN) {
2296
spin_unlock(&ci->i_ceph_lock);
2297
goto out;
2298
}
2299
2300
if (ci->i_ceph_flags &
2301
(CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS)) {
2302
if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
2303
__kick_flushing_caps(mdsc, session, ci, 0);
2304
if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
2305
__ceph_flush_snaps(ci, session);
2306
goto retry_locked;
2307
}
2308
2309
flushing = ci->i_dirty_caps;
2310
flush_tid = __mark_caps_flushing(inode, session, true,
2311
&oldest_flush_tid);
2312
2313
__prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH, CEPH_CLIENT_CAPS_SYNC,
2314
__ceph_caps_used(ci), __ceph_caps_wanted(ci),
2315
(cap->issued | cap->implemented),
2316
flushing, flush_tid, oldest_flush_tid);
2317
spin_unlock(&ci->i_ceph_lock);
2318
2319
__send_cap(&arg, ci);
2320
} else {
2321
if (!list_empty(&ci->i_cap_flush_list)) {
2322
struct ceph_cap_flush *cf =
2323
list_last_entry(&ci->i_cap_flush_list,
2324
struct ceph_cap_flush, i_list);
2325
cf->wake = true;
2326
flush_tid = cf->tid;
2327
}
2328
flushing = ci->i_flushing_caps;
2329
spin_unlock(&ci->i_ceph_lock);
2330
}
2331
out:
2332
*ptid = flush_tid;
2333
return flushing;
2334
}
2335
2336
/*
2337
* Return true if we've flushed caps through the given flush_tid.
2338
*/
2339
static int caps_are_flushed(struct inode *inode, u64 flush_tid)
2340
{
2341
struct ceph_inode_info *ci = ceph_inode(inode);
2342
int ret = 1;
2343
2344
spin_lock(&ci->i_ceph_lock);
2345
if (!list_empty(&ci->i_cap_flush_list)) {
2346
struct ceph_cap_flush * cf =
2347
list_first_entry(&ci->i_cap_flush_list,
2348
struct ceph_cap_flush, i_list);
2349
if (cf->tid <= flush_tid)
2350
ret = 0;
2351
}
2352
spin_unlock(&ci->i_ceph_lock);
2353
return ret;
2354
}
2355
2356
/*
2357
* flush the mdlog and wait for any unsafe requests to complete.
2358
*/
2359
static int flush_mdlog_and_wait_inode_unsafe_requests(struct inode *inode)
2360
{
2361
struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(inode->i_sb)->mdsc;
2362
struct ceph_client *cl = ceph_inode_to_client(inode);
2363
struct ceph_inode_info *ci = ceph_inode(inode);
2364
struct ceph_mds_request *req1 = NULL, *req2 = NULL;
2365
int ret, err = 0;
2366
2367
spin_lock(&ci->i_unsafe_lock);
2368
if (S_ISDIR(inode->i_mode) && !list_empty(&ci->i_unsafe_dirops)) {
2369
req1 = list_last_entry(&ci->i_unsafe_dirops,
2370
struct ceph_mds_request,
2371
r_unsafe_dir_item);
2372
ceph_mdsc_get_request(req1);
2373
}
2374
if (!list_empty(&ci->i_unsafe_iops)) {
2375
req2 = list_last_entry(&ci->i_unsafe_iops,
2376
struct ceph_mds_request,
2377
r_unsafe_target_item);
2378
ceph_mdsc_get_request(req2);
2379
}
2380
spin_unlock(&ci->i_unsafe_lock);
2381
2382
/*
2383
* Trigger to flush the journal logs in all the relevant MDSes
2384
* manually, or in the worst case we must wait at most 5 seconds
2385
* to wait the journal logs to be flushed by the MDSes periodically.
2386
*/
2387
if (req1 || req2) {
2388
struct ceph_mds_request *req;
2389
struct ceph_mds_session **sessions;
2390
struct ceph_mds_session *s;
2391
unsigned int max_sessions;
2392
int i;
2393
2394
mutex_lock(&mdsc->mutex);
2395
max_sessions = mdsc->max_sessions;
2396
2397
sessions = kcalloc(max_sessions, sizeof(s), GFP_KERNEL);
2398
if (!sessions) {
2399
mutex_unlock(&mdsc->mutex);
2400
err = -ENOMEM;
2401
goto out;
2402
}
2403
2404
spin_lock(&ci->i_unsafe_lock);
2405
if (req1) {
2406
list_for_each_entry(req, &ci->i_unsafe_dirops,
2407
r_unsafe_dir_item) {
2408
s = req->r_session;
2409
if (!s)
2410
continue;
2411
if (!sessions[s->s_mds]) {
2412
s = ceph_get_mds_session(s);
2413
sessions[s->s_mds] = s;
2414
}
2415
}
2416
}
2417
if (req2) {
2418
list_for_each_entry(req, &ci->i_unsafe_iops,
2419
r_unsafe_target_item) {
2420
s = req->r_session;
2421
if (!s)
2422
continue;
2423
if (!sessions[s->s_mds]) {
2424
s = ceph_get_mds_session(s);
2425
sessions[s->s_mds] = s;
2426
}
2427
}
2428
}
2429
spin_unlock(&ci->i_unsafe_lock);
2430
2431
/* the auth MDS */
2432
spin_lock(&ci->i_ceph_lock);
2433
if (ci->i_auth_cap) {
2434
s = ci->i_auth_cap->session;
2435
if (!sessions[s->s_mds])
2436
sessions[s->s_mds] = ceph_get_mds_session(s);
2437
}
2438
spin_unlock(&ci->i_ceph_lock);
2439
mutex_unlock(&mdsc->mutex);
2440
2441
/* send flush mdlog request to MDSes */
2442
for (i = 0; i < max_sessions; i++) {
2443
s = sessions[i];
2444
if (s) {
2445
send_flush_mdlog(s);
2446
ceph_put_mds_session(s);
2447
}
2448
}
2449
kfree(sessions);
2450
}
2451
2452
doutc(cl, "%p %llx.%llx wait on tid %llu %llu\n", inode,
2453
ceph_vinop(inode), req1 ? req1->r_tid : 0ULL,
2454
req2 ? req2->r_tid : 0ULL);
2455
if (req1) {
2456
ret = !wait_for_completion_timeout(&req1->r_safe_completion,
2457
ceph_timeout_jiffies(req1->r_timeout));
2458
if (ret)
2459
err = -EIO;
2460
}
2461
if (req2) {
2462
ret = !wait_for_completion_timeout(&req2->r_safe_completion,
2463
ceph_timeout_jiffies(req2->r_timeout));
2464
if (ret)
2465
err = -EIO;
2466
}
2467
2468
out:
2469
if (req1)
2470
ceph_mdsc_put_request(req1);
2471
if (req2)
2472
ceph_mdsc_put_request(req2);
2473
return err;
2474
}
2475
2476
int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
2477
{
2478
struct inode *inode = file->f_mapping->host;
2479
struct ceph_inode_info *ci = ceph_inode(inode);
2480
struct ceph_client *cl = ceph_inode_to_client(inode);
2481
u64 flush_tid;
2482
int ret, err;
2483
int dirty;
2484
2485
doutc(cl, "%p %llx.%llx%s\n", inode, ceph_vinop(inode),
2486
datasync ? " datasync" : "");
2487
2488
ret = file_write_and_wait_range(file, start, end);
2489
if (datasync)
2490
goto out;
2491
2492
ret = ceph_wait_on_async_create(inode);
2493
if (ret)
2494
goto out;
2495
2496
dirty = try_flush_caps(inode, &flush_tid);
2497
doutc(cl, "dirty caps are %s\n", ceph_cap_string(dirty));
2498
2499
err = flush_mdlog_and_wait_inode_unsafe_requests(inode);
2500
2501
/*
2502
* only wait on non-file metadata writeback (the mds
2503
* can recover size and mtime, so we don't need to
2504
* wait for that)
2505
*/
2506
if (!err && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
2507
err = wait_event_interruptible(ci->i_cap_wq,
2508
caps_are_flushed(inode, flush_tid));
2509
}
2510
2511
if (err < 0)
2512
ret = err;
2513
2514
err = file_check_and_advance_wb_err(file);
2515
if (err < 0)
2516
ret = err;
2517
out:
2518
doutc(cl, "%p %llx.%llx%s result=%d\n", inode, ceph_vinop(inode),
2519
datasync ? " datasync" : "", ret);
2520
return ret;
2521
}
2522
2523
/*
2524
* Flush any dirty caps back to the mds. If we aren't asked to wait,
2525
* queue inode for flush but don't do so immediately, because we can
2526
* get by with fewer MDS messages if we wait for data writeback to
2527
* complete first.
2528
*/
2529
int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
2530
{
2531
struct ceph_inode_info *ci = ceph_inode(inode);
2532
struct ceph_client *cl = ceph_inode_to_client(inode);
2533
u64 flush_tid;
2534
int err = 0;
2535
int dirty;
2536
int wait = (wbc->sync_mode == WB_SYNC_ALL && !wbc->for_sync);
2537
2538
doutc(cl, "%p %llx.%llx wait=%d\n", inode, ceph_vinop(inode), wait);
2539
ceph_fscache_unpin_writeback(inode, wbc);
2540
if (wait) {
2541
err = ceph_wait_on_async_create(inode);
2542
if (err)
2543
return err;
2544
dirty = try_flush_caps(inode, &flush_tid);
2545
if (dirty)
2546
err = wait_event_interruptible(ci->i_cap_wq,
2547
caps_are_flushed(inode, flush_tid));
2548
} else {
2549
struct ceph_mds_client *mdsc =
2550
ceph_sb_to_fs_client(inode->i_sb)->mdsc;
2551
2552
spin_lock(&ci->i_ceph_lock);
2553
if (__ceph_caps_dirty(ci))
2554
__cap_delay_requeue_front(mdsc, ci);
2555
spin_unlock(&ci->i_ceph_lock);
2556
}
2557
return err;
2558
}
2559
2560
static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
2561
struct ceph_mds_session *session,
2562
struct ceph_inode_info *ci,
2563
u64 oldest_flush_tid)
2564
__releases(ci->i_ceph_lock)
2565
__acquires(ci->i_ceph_lock)
2566
{
2567
struct inode *inode = &ci->netfs.inode;
2568
struct ceph_client *cl = mdsc->fsc->client;
2569
struct ceph_cap *cap;
2570
struct ceph_cap_flush *cf;
2571
int ret;
2572
u64 first_tid = 0;
2573
u64 last_snap_flush = 0;
2574
2575
/* Don't do anything until create reply comes in */
2576
if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE)
2577
return;
2578
2579
ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
2580
2581
list_for_each_entry_reverse(cf, &ci->i_cap_flush_list, i_list) {
2582
if (cf->is_capsnap) {
2583
last_snap_flush = cf->tid;
2584
break;
2585
}
2586
}
2587
2588
list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
2589
if (cf->tid < first_tid)
2590
continue;
2591
2592
cap = ci->i_auth_cap;
2593
if (!(cap && cap->session == session)) {
2594
pr_err_client(cl, "%p auth cap %p not mds%d ???\n",
2595
inode, cap, session->s_mds);
2596
break;
2597
}
2598
2599
first_tid = cf->tid + 1;
2600
2601
if (!cf->is_capsnap) {
2602
struct cap_msg_args arg;
2603
2604
doutc(cl, "%p %llx.%llx cap %p tid %llu %s\n",
2605
inode, ceph_vinop(inode), cap, cf->tid,
2606
ceph_cap_string(cf->caps));
2607
__prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH,
2608
(cf->tid < last_snap_flush ?
2609
CEPH_CLIENT_CAPS_PENDING_CAPSNAP : 0),
2610
__ceph_caps_used(ci),
2611
__ceph_caps_wanted(ci),
2612
(cap->issued | cap->implemented),
2613
cf->caps, cf->tid, oldest_flush_tid);
2614
spin_unlock(&ci->i_ceph_lock);
2615
__send_cap(&arg, ci);
2616
} else {
2617
struct ceph_cap_snap *capsnap =
2618
container_of(cf, struct ceph_cap_snap,
2619
cap_flush);
2620
doutc(cl, "%p %llx.%llx capsnap %p tid %llu %s\n",
2621
inode, ceph_vinop(inode), capsnap, cf->tid,
2622
ceph_cap_string(capsnap->dirty));
2623
2624
refcount_inc(&capsnap->nref);
2625
spin_unlock(&ci->i_ceph_lock);
2626
2627
ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
2628
oldest_flush_tid);
2629
if (ret < 0) {
2630
pr_err_client(cl, "error sending cap flushsnap,"
2631
" %p %llx.%llx tid %llu follows %llu\n",
2632
inode, ceph_vinop(inode), cf->tid,
2633
capsnap->follows);
2634
}
2635
2636
ceph_put_cap_snap(capsnap);
2637
}
2638
2639
spin_lock(&ci->i_ceph_lock);
2640
}
2641
}
2642
2643
void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
2644
struct ceph_mds_session *session)
2645
{
2646
struct ceph_client *cl = mdsc->fsc->client;
2647
struct ceph_inode_info *ci;
2648
struct ceph_cap *cap;
2649
u64 oldest_flush_tid;
2650
2651
doutc(cl, "mds%d\n", session->s_mds);
2652
2653
spin_lock(&mdsc->cap_dirty_lock);
2654
oldest_flush_tid = __get_oldest_flush_tid(mdsc);
2655
spin_unlock(&mdsc->cap_dirty_lock);
2656
2657
list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
2658
struct inode *inode = &ci->netfs.inode;
2659
2660
spin_lock(&ci->i_ceph_lock);
2661
cap = ci->i_auth_cap;
2662
if (!(cap && cap->session == session)) {
2663
pr_err_client(cl, "%p %llx.%llx auth cap %p not mds%d ???\n",
2664
inode, ceph_vinop(inode), cap,
2665
session->s_mds);
2666
spin_unlock(&ci->i_ceph_lock);
2667
continue;
2668
}
2669
2670
2671
/*
2672
* if flushing caps were revoked, we re-send the cap flush
2673
* in client reconnect stage. This guarantees MDS * processes
2674
* the cap flush message before issuing the flushing caps to
2675
* other client.
2676
*/
2677
if ((cap->issued & ci->i_flushing_caps) !=
2678
ci->i_flushing_caps) {
2679
/* encode_caps_cb() also will reset these sequence
2680
* numbers. make sure sequence numbers in cap flush
2681
* message match later reconnect message */
2682
cap->seq = 0;
2683
cap->issue_seq = 0;
2684
cap->mseq = 0;
2685
__kick_flushing_caps(mdsc, session, ci,
2686
oldest_flush_tid);
2687
} else {
2688
ci->i_ceph_flags |= CEPH_I_KICK_FLUSH;
2689
}
2690
2691
spin_unlock(&ci->i_ceph_lock);
2692
}
2693
}
2694
2695
void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
2696
struct ceph_mds_session *session)
2697
{
2698
struct ceph_client *cl = mdsc->fsc->client;
2699
struct ceph_inode_info *ci;
2700
struct ceph_cap *cap;
2701
u64 oldest_flush_tid;
2702
2703
lockdep_assert_held(&session->s_mutex);
2704
2705
doutc(cl, "mds%d\n", session->s_mds);
2706
2707
spin_lock(&mdsc->cap_dirty_lock);
2708
oldest_flush_tid = __get_oldest_flush_tid(mdsc);
2709
spin_unlock(&mdsc->cap_dirty_lock);
2710
2711
list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
2712
struct inode *inode = &ci->netfs.inode;
2713
2714
spin_lock(&ci->i_ceph_lock);
2715
cap = ci->i_auth_cap;
2716
if (!(cap && cap->session == session)) {
2717
pr_err_client(cl, "%p %llx.%llx auth cap %p not mds%d ???\n",
2718
inode, ceph_vinop(inode), cap,
2719
session->s_mds);
2720
spin_unlock(&ci->i_ceph_lock);
2721
continue;
2722
}
2723
if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
2724
__kick_flushing_caps(mdsc, session, ci,
2725
oldest_flush_tid);
2726
}
2727
spin_unlock(&ci->i_ceph_lock);
2728
}
2729
}
2730
2731
void ceph_kick_flushing_inode_caps(struct ceph_mds_session *session,
2732
struct ceph_inode_info *ci)
2733
{
2734
struct ceph_mds_client *mdsc = session->s_mdsc;
2735
struct ceph_cap *cap = ci->i_auth_cap;
2736
struct inode *inode = &ci->netfs.inode;
2737
2738
lockdep_assert_held(&ci->i_ceph_lock);
2739
2740
doutc(mdsc->fsc->client, "%p %llx.%llx flushing %s\n",
2741
inode, ceph_vinop(inode),
2742
ceph_cap_string(ci->i_flushing_caps));
2743
2744
if (!list_empty(&ci->i_cap_flush_list)) {
2745
u64 oldest_flush_tid;
2746
spin_lock(&mdsc->cap_dirty_lock);
2747
list_move_tail(&ci->i_flushing_item,
2748
&cap->session->s_cap_flushing);
2749
oldest_flush_tid = __get_oldest_flush_tid(mdsc);
2750
spin_unlock(&mdsc->cap_dirty_lock);
2751
2752
__kick_flushing_caps(mdsc, session, ci, oldest_flush_tid);
2753
}
2754
}
2755
2756
2757
/*
2758
* Take references to capabilities we hold, so that we don't release
2759
* them to the MDS prematurely.
2760
*/
2761
void ceph_take_cap_refs(struct ceph_inode_info *ci, int got,
2762
bool snap_rwsem_locked)
2763
{
2764
struct inode *inode = &ci->netfs.inode;
2765
struct ceph_client *cl = ceph_inode_to_client(inode);
2766
2767
lockdep_assert_held(&ci->i_ceph_lock);
2768
2769
if (got & CEPH_CAP_PIN)
2770
ci->i_pin_ref++;
2771
if (got & CEPH_CAP_FILE_RD)
2772
ci->i_rd_ref++;
2773
if (got & CEPH_CAP_FILE_CACHE)
2774
ci->i_rdcache_ref++;
2775
if (got & CEPH_CAP_FILE_EXCL)
2776
ci->i_fx_ref++;
2777
if (got & CEPH_CAP_FILE_WR) {
2778
if (ci->i_wr_ref == 0 && !ci->i_head_snapc) {
2779
BUG_ON(!snap_rwsem_locked);
2780
ci->i_head_snapc = ceph_get_snap_context(
2781
ci->i_snap_realm->cached_context);
2782
}
2783
ci->i_wr_ref++;
2784
}
2785
if (got & CEPH_CAP_FILE_BUFFER) {
2786
if (ci->i_wb_ref == 0)
2787
ihold(inode);
2788
ci->i_wb_ref++;
2789
doutc(cl, "%p %llx.%llx wb %d -> %d (?)\n", inode,
2790
ceph_vinop(inode), ci->i_wb_ref-1, ci->i_wb_ref);
2791
}
2792
}
2793
2794
/*
2795
* Try to grab cap references. Specify those refs we @want, and the
2796
* minimal set we @need. Also include the larger offset we are writing
2797
* to (when applicable), and check against max_size here as well.
2798
* Note that caller is responsible for ensuring max_size increases are
2799
* requested from the MDS.
2800
*
2801
* Returns 0 if caps were not able to be acquired (yet), 1 if succeed,
2802
* or a negative error code. There are 3 special error codes:
2803
* -EAGAIN: need to sleep but non-blocking is specified
2804
* -EFBIG: ask caller to call check_max_size() and try again.
2805
* -EUCLEAN: ask caller to call ceph_renew_caps() and try again.
2806
*/
2807
enum {
2808
/* first 8 bits are reserved for CEPH_FILE_MODE_FOO */
2809
NON_BLOCKING = (1 << 8),
2810
CHECK_FILELOCK = (1 << 9),
2811
};
2812
2813
static int try_get_cap_refs(struct inode *inode, int need, int want,
2814
loff_t endoff, int flags, int *got)
2815
{
2816
struct ceph_inode_info *ci = ceph_inode(inode);
2817
struct ceph_mds_client *mdsc = ceph_inode_to_fs_client(inode)->mdsc;
2818
struct ceph_client *cl = ceph_inode_to_client(inode);
2819
int ret = 0;
2820
int have, implemented;
2821
bool snap_rwsem_locked = false;
2822
2823
doutc(cl, "%p %llx.%llx need %s want %s\n", inode,
2824
ceph_vinop(inode), ceph_cap_string(need),
2825
ceph_cap_string(want));
2826
2827
again:
2828
spin_lock(&ci->i_ceph_lock);
2829
2830
if ((flags & CHECK_FILELOCK) &&
2831
(ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK)) {
2832
doutc(cl, "%p %llx.%llx error filelock\n", inode,
2833
ceph_vinop(inode));
2834
ret = -EIO;
2835
goto out_unlock;
2836
}
2837
2838
/* finish pending truncate */
2839
while (ci->i_truncate_pending) {
2840
spin_unlock(&ci->i_ceph_lock);
2841
if (snap_rwsem_locked) {
2842
up_read(&mdsc->snap_rwsem);
2843
snap_rwsem_locked = false;
2844
}
2845
__ceph_do_pending_vmtruncate(inode);
2846
spin_lock(&ci->i_ceph_lock);
2847
}
2848
2849
have = __ceph_caps_issued(ci, &implemented);
2850
2851
if (have & need & CEPH_CAP_FILE_WR) {
2852
if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
2853
doutc(cl, "%p %llx.%llx endoff %llu > maxsize %llu\n",
2854
inode, ceph_vinop(inode), endoff, ci->i_max_size);
2855
if (endoff > ci->i_requested_max_size)
2856
ret = ci->i_auth_cap ? -EFBIG : -EUCLEAN;
2857
goto out_unlock;
2858
}
2859
/*
2860
* If a sync write is in progress, we must wait, so that we
2861
* can get a final snapshot value for size+mtime.
2862
*/
2863
if (__ceph_have_pending_cap_snap(ci)) {
2864
doutc(cl, "%p %llx.%llx cap_snap_pending\n", inode,
2865
ceph_vinop(inode));
2866
goto out_unlock;
2867
}
2868
}
2869
2870
if ((have & need) == need) {
2871
/*
2872
* Look at (implemented & ~have & not) so that we keep waiting
2873
* on transition from wanted -> needed caps. This is needed
2874
* for WRBUFFER|WR -> WR to avoid a new WR sync write from
2875
* going before a prior buffered writeback happens.
2876
*
2877
* For RDCACHE|RD -> RD, there is not need to wait and we can
2878
* just exclude the revoking caps and force to sync read.
2879
*/
2880
int not = want & ~(have & need);
2881
int revoking = implemented & ~have;
2882
int exclude = revoking & not;
2883
doutc(cl, "%p %llx.%llx have %s but not %s (revoking %s)\n",
2884
inode, ceph_vinop(inode), ceph_cap_string(have),
2885
ceph_cap_string(not), ceph_cap_string(revoking));
2886
if (!exclude || !(exclude & CEPH_CAP_FILE_BUFFER)) {
2887
if (!snap_rwsem_locked &&
2888
!ci->i_head_snapc &&
2889
(need & CEPH_CAP_FILE_WR)) {
2890
if (!down_read_trylock(&mdsc->snap_rwsem)) {
2891
/*
2892
* we can not call down_read() when
2893
* task isn't in TASK_RUNNING state
2894
*/
2895
if (flags & NON_BLOCKING) {
2896
ret = -EAGAIN;
2897
goto out_unlock;
2898
}
2899
2900
spin_unlock(&ci->i_ceph_lock);
2901
down_read(&mdsc->snap_rwsem);
2902
snap_rwsem_locked = true;
2903
goto again;
2904
}
2905
snap_rwsem_locked = true;
2906
}
2907
if ((have & want) == want)
2908
*got = need | (want & ~exclude);
2909
else
2910
*got = need;
2911
ceph_take_cap_refs(ci, *got, true);
2912
ret = 1;
2913
}
2914
} else {
2915
int session_readonly = false;
2916
int mds_wanted;
2917
if (ci->i_auth_cap &&
2918
(need & (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_EXCL))) {
2919
struct ceph_mds_session *s = ci->i_auth_cap->session;
2920
spin_lock(&s->s_cap_lock);
2921
session_readonly = s->s_readonly;
2922
spin_unlock(&s->s_cap_lock);
2923
}
2924
if (session_readonly) {
2925
doutc(cl, "%p %llx.%llx need %s but mds%d readonly\n",
2926
inode, ceph_vinop(inode), ceph_cap_string(need),
2927
ci->i_auth_cap->mds);
2928
ret = -EROFS;
2929
goto out_unlock;
2930
}
2931
2932
if (ceph_inode_is_shutdown(inode)) {
2933
doutc(cl, "%p %llx.%llx inode is shutdown\n",
2934
inode, ceph_vinop(inode));
2935
ret = -ESTALE;
2936
goto out_unlock;
2937
}
2938
mds_wanted = __ceph_caps_mds_wanted(ci, false);
2939
if (need & ~mds_wanted) {
2940
doutc(cl, "%p %llx.%llx need %s > mds_wanted %s\n",
2941
inode, ceph_vinop(inode), ceph_cap_string(need),
2942
ceph_cap_string(mds_wanted));
2943
ret = -EUCLEAN;
2944
goto out_unlock;
2945
}
2946
2947
doutc(cl, "%p %llx.%llx have %s need %s\n", inode,
2948
ceph_vinop(inode), ceph_cap_string(have),
2949
ceph_cap_string(need));
2950
}
2951
out_unlock:
2952
2953
__ceph_touch_fmode(ci, mdsc, flags);
2954
2955
spin_unlock(&ci->i_ceph_lock);
2956
if (snap_rwsem_locked)
2957
up_read(&mdsc->snap_rwsem);
2958
2959
if (!ret)
2960
ceph_update_cap_mis(&mdsc->metric);
2961
else if (ret == 1)
2962
ceph_update_cap_hit(&mdsc->metric);
2963
2964
doutc(cl, "%p %llx.%llx ret %d got %s\n", inode,
2965
ceph_vinop(inode), ret, ceph_cap_string(*got));
2966
return ret;
2967
}
2968
2969
/*
2970
* Check the offset we are writing up to against our current
2971
* max_size. If necessary, tell the MDS we want to write to
2972
* a larger offset.
2973
*/
2974
static void check_max_size(struct inode *inode, loff_t endoff)
2975
{
2976
struct ceph_inode_info *ci = ceph_inode(inode);
2977
struct ceph_client *cl = ceph_inode_to_client(inode);
2978
int check = 0;
2979
2980
/* do we need to explicitly request a larger max_size? */
2981
spin_lock(&ci->i_ceph_lock);
2982
if (endoff >= ci->i_max_size && endoff > ci->i_wanted_max_size) {
2983
doutc(cl, "write %p %llx.%llx at large endoff %llu, req max_size\n",
2984
inode, ceph_vinop(inode), endoff);
2985
ci->i_wanted_max_size = endoff;
2986
}
2987
/* duplicate ceph_check_caps()'s logic */
2988
if (ci->i_auth_cap &&
2989
(ci->i_auth_cap->issued & CEPH_CAP_FILE_WR) &&
2990
ci->i_wanted_max_size > ci->i_max_size &&
2991
ci->i_wanted_max_size > ci->i_requested_max_size)
2992
check = 1;
2993
spin_unlock(&ci->i_ceph_lock);
2994
if (check)
2995
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY);
2996
}
2997
2998
static inline int get_used_fmode(int caps)
2999
{
3000
int fmode = 0;
3001
if (caps & CEPH_CAP_FILE_RD)
3002
fmode |= CEPH_FILE_MODE_RD;
3003
if (caps & CEPH_CAP_FILE_WR)
3004
fmode |= CEPH_FILE_MODE_WR;
3005
return fmode;
3006
}
3007
3008
int ceph_try_get_caps(struct inode *inode, int need, int want,
3009
bool nonblock, int *got)
3010
{
3011
int ret, flags;
3012
3013
BUG_ON(need & ~CEPH_CAP_FILE_RD);
3014
BUG_ON(want & ~(CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO |
3015
CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
3016
CEPH_CAP_ANY_DIR_OPS));
3017
if (need) {
3018
ret = ceph_pool_perm_check(inode, need);
3019
if (ret < 0)
3020
return ret;
3021
}
3022
3023
flags = get_used_fmode(need | want);
3024
if (nonblock)
3025
flags |= NON_BLOCKING;
3026
3027
ret = try_get_cap_refs(inode, need, want, 0, flags, got);
3028
/* three special error codes */
3029
if (ret == -EAGAIN || ret == -EFBIG || ret == -EUCLEAN)
3030
ret = 0;
3031
return ret;
3032
}
3033
3034
/*
3035
* Wait for caps, and take cap references. If we can't get a WR cap
3036
* due to a small max_size, make sure we check_max_size (and possibly
3037
* ask the mds) so we don't get hung up indefinitely.
3038
*/
3039
int __ceph_get_caps(struct inode *inode, struct ceph_file_info *fi, int need,
3040
int want, loff_t endoff, int *got)
3041
{
3042
struct ceph_inode_info *ci = ceph_inode(inode);
3043
struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
3044
int ret, _got, flags;
3045
3046
ret = ceph_pool_perm_check(inode, need);
3047
if (ret < 0)
3048
return ret;
3049
3050
if (fi && (fi->fmode & CEPH_FILE_MODE_WR) &&
3051
fi->filp_gen != READ_ONCE(fsc->filp_gen))
3052
return -EBADF;
3053
3054
flags = get_used_fmode(need | want);
3055
3056
while (true) {
3057
flags &= CEPH_FILE_MODE_MASK;
3058
if (vfs_inode_has_locks(inode))
3059
flags |= CHECK_FILELOCK;
3060
_got = 0;
3061
ret = try_get_cap_refs(inode, need, want, endoff,
3062
flags, &_got);
3063
WARN_ON_ONCE(ret == -EAGAIN);
3064
if (!ret) {
3065
#ifdef CONFIG_DEBUG_FS
3066
struct ceph_mds_client *mdsc = fsc->mdsc;
3067
struct cap_wait cw;
3068
#endif
3069
DEFINE_WAIT_FUNC(wait, woken_wake_function);
3070
3071
#ifdef CONFIG_DEBUG_FS
3072
cw.ino = ceph_ino(inode);
3073
cw.tgid = current->tgid;
3074
cw.need = need;
3075
cw.want = want;
3076
3077
spin_lock(&mdsc->caps_list_lock);
3078
list_add(&cw.list, &mdsc->cap_wait_list);
3079
spin_unlock(&mdsc->caps_list_lock);
3080
#endif
3081
3082
/* make sure used fmode not timeout */
3083
ceph_get_fmode(ci, flags, FMODE_WAIT_BIAS);
3084
add_wait_queue(&ci->i_cap_wq, &wait);
3085
3086
flags |= NON_BLOCKING;
3087
while (!(ret = try_get_cap_refs(inode, need, want,
3088
endoff, flags, &_got))) {
3089
if (signal_pending(current)) {
3090
ret = -ERESTARTSYS;
3091
break;
3092
}
3093
wait_woken(&wait, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
3094
}
3095
3096
remove_wait_queue(&ci->i_cap_wq, &wait);
3097
ceph_put_fmode(ci, flags, FMODE_WAIT_BIAS);
3098
3099
#ifdef CONFIG_DEBUG_FS
3100
spin_lock(&mdsc->caps_list_lock);
3101
list_del(&cw.list);
3102
spin_unlock(&mdsc->caps_list_lock);
3103
#endif
3104
3105
if (ret == -EAGAIN)
3106
continue;
3107
}
3108
3109
if (fi && (fi->fmode & CEPH_FILE_MODE_WR) &&
3110
fi->filp_gen != READ_ONCE(fsc->filp_gen)) {
3111
if (ret >= 0 && _got)
3112
ceph_put_cap_refs(ci, _got);
3113
return -EBADF;
3114
}
3115
3116
if (ret < 0) {
3117
if (ret == -EFBIG || ret == -EUCLEAN) {
3118
int ret2 = ceph_wait_on_async_create(inode);
3119
if (ret2 < 0)
3120
return ret2;
3121
}
3122
if (ret == -EFBIG) {
3123
check_max_size(inode, endoff);
3124
continue;
3125
}
3126
if (ret == -EUCLEAN) {
3127
/* session was killed, try renew caps */
3128
ret = ceph_renew_caps(inode, flags);
3129
if (ret == 0)
3130
continue;
3131
}
3132
return ret;
3133
}
3134
3135
if (S_ISREG(ci->netfs.inode.i_mode) &&
3136
ceph_has_inline_data(ci) &&
3137
(_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
3138
i_size_read(inode) > 0) {
3139
struct page *page =
3140
find_get_page(inode->i_mapping, 0);
3141
if (page) {
3142
bool uptodate = PageUptodate(page);
3143
3144
put_page(page);
3145
if (uptodate)
3146
break;
3147
}
3148
/*
3149
* drop cap refs first because getattr while
3150
* holding * caps refs can cause deadlock.
3151
*/
3152
ceph_put_cap_refs(ci, _got);
3153
_got = 0;
3154
3155
/*
3156
* getattr request will bring inline data into
3157
* page cache
3158
*/
3159
ret = __ceph_do_getattr(inode, NULL,
3160
CEPH_STAT_CAP_INLINE_DATA,
3161
true);
3162
if (ret < 0)
3163
return ret;
3164
continue;
3165
}
3166
break;
3167
}
3168
*got = _got;
3169
return 0;
3170
}
3171
3172
int ceph_get_caps(struct file *filp, int need, int want, loff_t endoff,
3173
int *got)
3174
{
3175
struct ceph_file_info *fi = filp->private_data;
3176
struct inode *inode = file_inode(filp);
3177
3178
return __ceph_get_caps(inode, fi, need, want, endoff, got);
3179
}
3180
3181
/*
3182
* Take cap refs. Caller must already know we hold at least one ref
3183
* on the caps in question or we don't know this is safe.
3184
*/
3185
void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
3186
{
3187
spin_lock(&ci->i_ceph_lock);
3188
ceph_take_cap_refs(ci, caps, false);
3189
spin_unlock(&ci->i_ceph_lock);
3190
}
3191
3192
3193
/*
3194
* drop cap_snap that is not associated with any snapshot.
3195
* we don't need to send FLUSHSNAP message for it.
3196
*/
3197
static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci,
3198
struct ceph_cap_snap *capsnap)
3199
{
3200
struct inode *inode = &ci->netfs.inode;
3201
struct ceph_client *cl = ceph_inode_to_client(inode);
3202
3203
if (!capsnap->need_flush &&
3204
!capsnap->writing && !capsnap->dirty_pages) {
3205
doutc(cl, "%p follows %llu\n", capsnap, capsnap->follows);
3206
BUG_ON(capsnap->cap_flush.tid > 0);
3207
ceph_put_snap_context(capsnap->context);
3208
if (!list_is_last(&capsnap->ci_item, &ci->i_cap_snaps))
3209
ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
3210
3211
list_del(&capsnap->ci_item);
3212
ceph_put_cap_snap(capsnap);
3213
return 1;
3214
}
3215
return 0;
3216
}
3217
3218
enum put_cap_refs_mode {
3219
PUT_CAP_REFS_SYNC = 0,
3220
PUT_CAP_REFS_ASYNC,
3221
};
3222
3223
/*
3224
* Release cap refs.
3225
*
3226
* If we released the last ref on any given cap, call ceph_check_caps
3227
* to release (or schedule a release).
3228
*
3229
* If we are releasing a WR cap (from a sync write), finalize any affected
3230
* cap_snap, and wake up any waiters.
3231
*/
3232
static void __ceph_put_cap_refs(struct ceph_inode_info *ci, int had,
3233
enum put_cap_refs_mode mode)
3234
{
3235
struct inode *inode = &ci->netfs.inode;
3236
struct ceph_client *cl = ceph_inode_to_client(inode);
3237
int last = 0, put = 0, flushsnaps = 0, wake = 0;
3238
bool check_flushsnaps = false;
3239
3240
spin_lock(&ci->i_ceph_lock);
3241
if (had & CEPH_CAP_PIN)
3242
--ci->i_pin_ref;
3243
if (had & CEPH_CAP_FILE_RD)
3244
if (--ci->i_rd_ref == 0)
3245
last++;
3246
if (had & CEPH_CAP_FILE_CACHE)
3247
if (--ci->i_rdcache_ref == 0)
3248
last++;
3249
if (had & CEPH_CAP_FILE_EXCL)
3250
if (--ci->i_fx_ref == 0)
3251
last++;
3252
if (had & CEPH_CAP_FILE_BUFFER) {
3253
if (--ci->i_wb_ref == 0) {
3254
last++;
3255
/* put the ref held by ceph_take_cap_refs() */
3256
put++;
3257
check_flushsnaps = true;
3258
}
3259
doutc(cl, "%p %llx.%llx wb %d -> %d (?)\n", inode,
3260
ceph_vinop(inode), ci->i_wb_ref+1, ci->i_wb_ref);
3261
}
3262
if (had & CEPH_CAP_FILE_WR) {
3263
if (--ci->i_wr_ref == 0) {
3264
/*
3265
* The Fb caps will always be took and released
3266
* together with the Fw caps.
3267
*/
3268
WARN_ON_ONCE(ci->i_wb_ref);
3269
3270
last++;
3271
check_flushsnaps = true;
3272
if (ci->i_wrbuffer_ref_head == 0 &&
3273
ci->i_dirty_caps == 0 &&
3274
ci->i_flushing_caps == 0) {
3275
BUG_ON(!ci->i_head_snapc);
3276
ceph_put_snap_context(ci->i_head_snapc);
3277
ci->i_head_snapc = NULL;
3278
}
3279
/* see comment in __ceph_remove_cap() */
3280
if (!__ceph_is_any_real_caps(ci) && ci->i_snap_realm)
3281
ceph_change_snap_realm(inode, NULL);
3282
}
3283
}
3284
if (check_flushsnaps && __ceph_have_pending_cap_snap(ci)) {
3285
struct ceph_cap_snap *capsnap =
3286
list_last_entry(&ci->i_cap_snaps,
3287
struct ceph_cap_snap,
3288
ci_item);
3289
3290
capsnap->writing = 0;
3291
if (ceph_try_drop_cap_snap(ci, capsnap))
3292
/* put the ref held by ceph_queue_cap_snap() */
3293
put++;
3294
else if (__ceph_finish_cap_snap(ci, capsnap))
3295
flushsnaps = 1;
3296
wake = 1;
3297
}
3298
spin_unlock(&ci->i_ceph_lock);
3299
3300
doutc(cl, "%p %llx.%llx had %s%s%s\n", inode, ceph_vinop(inode),
3301
ceph_cap_string(had), last ? " last" : "", put ? " put" : "");
3302
3303
switch (mode) {
3304
case PUT_CAP_REFS_SYNC:
3305
if (last)
3306
ceph_check_caps(ci, 0);
3307
else if (flushsnaps)
3308
ceph_flush_snaps(ci, NULL);
3309
break;
3310
case PUT_CAP_REFS_ASYNC:
3311
if (last)
3312
ceph_queue_check_caps(inode);
3313
else if (flushsnaps)
3314
ceph_queue_flush_snaps(inode);
3315
break;
3316
default:
3317
break;
3318
}
3319
if (wake)
3320
wake_up_all(&ci->i_cap_wq);
3321
while (put-- > 0)
3322
iput(inode);
3323
}
3324
3325
void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
3326
{
3327
__ceph_put_cap_refs(ci, had, PUT_CAP_REFS_SYNC);
3328
}
3329
3330
void ceph_put_cap_refs_async(struct ceph_inode_info *ci, int had)
3331
{
3332
__ceph_put_cap_refs(ci, had, PUT_CAP_REFS_ASYNC);
3333
}
3334
3335
/*
3336
* Release @nr WRBUFFER refs on dirty pages for the given @snapc snap
3337
* context. Adjust per-snap dirty page accounting as appropriate.
3338
* Once all dirty data for a cap_snap is flushed, flush snapped file
3339
* metadata back to the MDS. If we dropped the last ref, call
3340
* ceph_check_caps.
3341
*/
3342
void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
3343
struct ceph_snap_context *snapc)
3344
{
3345
struct inode *inode = &ci->netfs.inode;
3346
struct ceph_client *cl = ceph_inode_to_client(inode);
3347
struct ceph_cap_snap *capsnap = NULL, *iter;
3348
int put = 0;
3349
bool last = false;
3350
bool flush_snaps = false;
3351
bool complete_capsnap = false;
3352
3353
spin_lock(&ci->i_ceph_lock);
3354
ci->i_wrbuffer_ref -= nr;
3355
if (ci->i_wrbuffer_ref == 0) {
3356
last = true;
3357
put++;
3358
}
3359
3360
if (ci->i_head_snapc == snapc) {
3361
ci->i_wrbuffer_ref_head -= nr;
3362
if (ci->i_wrbuffer_ref_head == 0 &&
3363
ci->i_wr_ref == 0 &&
3364
ci->i_dirty_caps == 0 &&
3365
ci->i_flushing_caps == 0) {
3366
BUG_ON(!ci->i_head_snapc);
3367
ceph_put_snap_context(ci->i_head_snapc);
3368
ci->i_head_snapc = NULL;
3369
}
3370
doutc(cl, "on %p %llx.%llx head %d/%d -> %d/%d %s\n",
3371
inode, ceph_vinop(inode), ci->i_wrbuffer_ref+nr,
3372
ci->i_wrbuffer_ref_head+nr, ci->i_wrbuffer_ref,
3373
ci->i_wrbuffer_ref_head, last ? " LAST" : "");
3374
} else {
3375
list_for_each_entry(iter, &ci->i_cap_snaps, ci_item) {
3376
if (iter->context == snapc) {
3377
capsnap = iter;
3378
break;
3379
}
3380
}
3381
3382
if (!capsnap) {
3383
/*
3384
* The capsnap should already be removed when removing
3385
* auth cap in the case of a forced unmount.
3386
*/
3387
WARN_ON_ONCE(ci->i_auth_cap);
3388
goto unlock;
3389
}
3390
3391
capsnap->dirty_pages -= nr;
3392
if (capsnap->dirty_pages == 0) {
3393
complete_capsnap = true;
3394
if (!capsnap->writing) {
3395
if (ceph_try_drop_cap_snap(ci, capsnap)) {
3396
put++;
3397
} else {
3398
ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
3399
flush_snaps = true;
3400
}
3401
}
3402
}
3403
doutc(cl, "%p %llx.%llx cap_snap %p snap %lld %d/%d -> %d/%d %s%s\n",
3404
inode, ceph_vinop(inode), capsnap, capsnap->context->seq,
3405
ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr,
3406
ci->i_wrbuffer_ref, capsnap->dirty_pages,
3407
last ? " (wrbuffer last)" : "",
3408
complete_capsnap ? " (complete capsnap)" : "");
3409
}
3410
3411
unlock:
3412
spin_unlock(&ci->i_ceph_lock);
3413
3414
if (last) {
3415
ceph_check_caps(ci, 0);
3416
} else if (flush_snaps) {
3417
ceph_flush_snaps(ci, NULL);
3418
}
3419
if (complete_capsnap)
3420
wake_up_all(&ci->i_cap_wq);
3421
while (put-- > 0) {
3422
iput(inode);
3423
}
3424
}
3425
3426
/*
3427
* Invalidate unlinked inode's aliases, so we can drop the inode ASAP.
3428
*/
3429
static void invalidate_aliases(struct inode *inode)
3430
{
3431
struct ceph_client *cl = ceph_inode_to_client(inode);
3432
struct dentry *dn, *prev = NULL;
3433
3434
doutc(cl, "%p %llx.%llx\n", inode, ceph_vinop(inode));
3435
d_prune_aliases(inode);
3436
/*
3437
* For non-directory inode, d_find_alias() only returns
3438
* hashed dentry. After calling d_invalidate(), the
3439
* dentry becomes unhashed.
3440
*
3441
* For directory inode, d_find_alias() can return
3442
* unhashed dentry. But directory inode should have
3443
* one alias at most.
3444
*/
3445
while ((dn = d_find_alias(inode))) {
3446
if (dn == prev) {
3447
dput(dn);
3448
break;
3449
}
3450
d_invalidate(dn);
3451
if (prev)
3452
dput(prev);
3453
prev = dn;
3454
}
3455
if (prev)
3456
dput(prev);
3457
}
3458
3459
struct cap_extra_info {
3460
struct ceph_string *pool_ns;
3461
/* inline data */
3462
u64 inline_version;
3463
void *inline_data;
3464
u32 inline_len;
3465
/* dirstat */
3466
bool dirstat_valid;
3467
u64 nfiles;
3468
u64 nsubdirs;
3469
u64 change_attr;
3470
/* currently issued */
3471
int issued;
3472
struct timespec64 btime;
3473
u8 *fscrypt_auth;
3474
u32 fscrypt_auth_len;
3475
u64 fscrypt_file_size;
3476
};
3477
3478
/*
3479
* Handle a cap GRANT message from the MDS. (Note that a GRANT may
3480
* actually be a revocation if it specifies a smaller cap set.)
3481
*
3482
* caller holds s_mutex and i_ceph_lock, we drop both.
3483
*/
3484
static void handle_cap_grant(struct inode *inode,
3485
struct ceph_mds_session *session,
3486
struct ceph_cap *cap,
3487
struct ceph_mds_caps *grant,
3488
struct ceph_buffer *xattr_buf,
3489
struct cap_extra_info *extra_info)
3490
__releases(ci->i_ceph_lock)
3491
__releases(session->s_mdsc->snap_rwsem)
3492
{
3493
struct ceph_client *cl = ceph_inode_to_client(inode);
3494
struct ceph_inode_info *ci = ceph_inode(inode);
3495
int seq = le32_to_cpu(grant->seq);
3496
int newcaps = le32_to_cpu(grant->caps);
3497
int used, wanted, dirty;
3498
u64 size = le64_to_cpu(grant->size);
3499
u64 max_size = le64_to_cpu(grant->max_size);
3500
unsigned char check_caps = 0;
3501
bool was_stale = cap->cap_gen < atomic_read(&session->s_cap_gen);
3502
bool wake = false;
3503
bool writeback = false;
3504
bool queue_trunc = false;
3505
bool queue_invalidate = false;
3506
bool deleted_inode = false;
3507
bool fill_inline = false;
3508
bool revoke_wait = false;
3509
int flags = 0;
3510
3511
/*
3512
* If there is at least one crypto block then we'll trust
3513
* fscrypt_file_size. If the real length of the file is 0, then
3514
* ignore it (it has probably been truncated down to 0 by the MDS).
3515
*/
3516
if (IS_ENCRYPTED(inode) && size)
3517
size = extra_info->fscrypt_file_size;
3518
3519
doutc(cl, "%p %llx.%llx cap %p mds%d seq %d %s\n", inode,
3520
ceph_vinop(inode), cap, session->s_mds, seq,
3521
ceph_cap_string(newcaps));
3522
doutc(cl, " size %llu max_size %llu, i_size %llu\n", size,
3523
max_size, i_size_read(inode));
3524
3525
3526
/*
3527
* If CACHE is being revoked, and we have no dirty buffers,
3528
* try to invalidate (once). (If there are dirty buffers, we
3529
* will invalidate _after_ writeback.)
3530
*/
3531
if (S_ISREG(inode->i_mode) && /* don't invalidate readdir cache */
3532
((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
3533
(newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
3534
!(ci->i_wrbuffer_ref || ci->i_wb_ref)) {
3535
if (try_nonblocking_invalidate(inode)) {
3536
/* there were locked pages.. invalidate later
3537
in a separate thread. */
3538
if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
3539
queue_invalidate = true;
3540
ci->i_rdcache_revoking = ci->i_rdcache_gen;
3541
}
3542
}
3543
}
3544
3545
if (was_stale)
3546
cap->issued = cap->implemented = CEPH_CAP_PIN;
3547
3548
/*
3549
* auth mds of the inode changed. we received the cap export message,
3550
* but still haven't received the cap import message. handle_cap_export
3551
* updated the new auth MDS' cap.
3552
*
3553
* "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message
3554
* that was sent before the cap import message. So don't remove caps.
3555
*/
3556
if (ceph_seq_cmp(seq, cap->seq) <= 0) {
3557
WARN_ON(cap != ci->i_auth_cap);
3558
WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
3559
seq = cap->seq;
3560
newcaps |= cap->issued;
3561
}
3562
3563
/* side effects now are allowed */
3564
cap->cap_gen = atomic_read(&session->s_cap_gen);
3565
cap->seq = seq;
3566
3567
__check_cap_issue(ci, cap, newcaps);
3568
3569
inode_set_max_iversion_raw(inode, extra_info->change_attr);
3570
3571
if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
3572
(extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) {
3573
umode_t mode = le32_to_cpu(grant->mode);
3574
3575
if (inode_wrong_type(inode, mode))
3576
pr_warn_once("inode type changed! (ino %llx.%llx is 0%o, mds says 0%o)\n",
3577
ceph_vinop(inode), inode->i_mode, mode);
3578
else
3579
inode->i_mode = mode;
3580
inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
3581
inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
3582
ci->i_btime = extra_info->btime;
3583
doutc(cl, "%p %llx.%llx mode 0%o uid.gid %d.%d\n", inode,
3584
ceph_vinop(inode), inode->i_mode,
3585
from_kuid(&init_user_ns, inode->i_uid),
3586
from_kgid(&init_user_ns, inode->i_gid));
3587
#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
3588
if (ci->fscrypt_auth_len != extra_info->fscrypt_auth_len ||
3589
memcmp(ci->fscrypt_auth, extra_info->fscrypt_auth,
3590
ci->fscrypt_auth_len))
3591
pr_warn_ratelimited_client(cl,
3592
"cap grant attempt to change fscrypt_auth on non-I_NEW inode (old len %d new len %d)\n",
3593
ci->fscrypt_auth_len,
3594
extra_info->fscrypt_auth_len);
3595
#endif
3596
}
3597
3598
if ((newcaps & CEPH_CAP_LINK_SHARED) &&
3599
(extra_info->issued & CEPH_CAP_LINK_EXCL) == 0) {
3600
set_nlink(inode, le32_to_cpu(grant->nlink));
3601
if (inode->i_nlink == 0)
3602
deleted_inode = true;
3603
}
3604
3605
if ((extra_info->issued & CEPH_CAP_XATTR_EXCL) == 0 &&
3606
grant->xattr_len) {
3607
int len = le32_to_cpu(grant->xattr_len);
3608
u64 version = le64_to_cpu(grant->xattr_version);
3609
3610
if (version > ci->i_xattrs.version) {
3611
doutc(cl, " got new xattrs v%llu on %p %llx.%llx len %d\n",
3612
version, inode, ceph_vinop(inode), len);
3613
if (ci->i_xattrs.blob)
3614
ceph_buffer_put(ci->i_xattrs.blob);
3615
ci->i_xattrs.blob = ceph_buffer_get(xattr_buf);
3616
ci->i_xattrs.version = version;
3617
ceph_forget_all_cached_acls(inode);
3618
ceph_security_invalidate_secctx(inode);
3619
}
3620
}
3621
3622
if (newcaps & CEPH_CAP_ANY_RD) {
3623
struct timespec64 mtime, atime, ctime;
3624
/* ctime/mtime/atime? */
3625
ceph_decode_timespec64(&mtime, &grant->mtime);
3626
ceph_decode_timespec64(&atime, &grant->atime);
3627
ceph_decode_timespec64(&ctime, &grant->ctime);
3628
ceph_fill_file_time(inode, extra_info->issued,
3629
le32_to_cpu(grant->time_warp_seq),
3630
&ctime, &mtime, &atime);
3631
}
3632
3633
if ((newcaps & CEPH_CAP_FILE_SHARED) && extra_info->dirstat_valid) {
3634
ci->i_files = extra_info->nfiles;
3635
ci->i_subdirs = extra_info->nsubdirs;
3636
}
3637
3638
if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
3639
/* file layout may have changed */
3640
s64 old_pool = ci->i_layout.pool_id;
3641
struct ceph_string *old_ns;
3642
3643
ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
3644
old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
3645
lockdep_is_held(&ci->i_ceph_lock));
3646
rcu_assign_pointer(ci->i_layout.pool_ns, extra_info->pool_ns);
3647
3648
if (ci->i_layout.pool_id != old_pool ||
3649
extra_info->pool_ns != old_ns)
3650
ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
3651
3652
extra_info->pool_ns = old_ns;
3653
3654
/* size/truncate_seq? */
3655
queue_trunc = ceph_fill_file_size(inode, extra_info->issued,
3656
le32_to_cpu(grant->truncate_seq),
3657
le64_to_cpu(grant->truncate_size),
3658
size);
3659
}
3660
3661
if (ci->i_auth_cap == cap && (newcaps & CEPH_CAP_ANY_FILE_WR)) {
3662
if (max_size != ci->i_max_size) {
3663
doutc(cl, "max_size %lld -> %llu\n", ci->i_max_size,
3664
max_size);
3665
ci->i_max_size = max_size;
3666
if (max_size >= ci->i_wanted_max_size) {
3667
ci->i_wanted_max_size = 0; /* reset */
3668
ci->i_requested_max_size = 0;
3669
}
3670
wake = true;
3671
}
3672
}
3673
3674
/* check cap bits */
3675
wanted = __ceph_caps_wanted(ci);
3676
used = __ceph_caps_used(ci);
3677
dirty = __ceph_caps_dirty(ci);
3678
doutc(cl, " my wanted = %s, used = %s, dirty %s\n",
3679
ceph_cap_string(wanted), ceph_cap_string(used),
3680
ceph_cap_string(dirty));
3681
3682
if ((was_stale || le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) &&
3683
(wanted & ~(cap->mds_wanted | newcaps))) {
3684
/*
3685
* If mds is importing cap, prior cap messages that update
3686
* 'wanted' may get dropped by mds (migrate seq mismatch).
3687
*
3688
* We don't send cap message to update 'wanted' if what we
3689
* want are already issued. If mds revokes caps, cap message
3690
* that releases caps also tells mds what we want. But if
3691
* caps got revoked by mds forcedly (session stale). We may
3692
* haven't told mds what we want.
3693
*/
3694
check_caps = 1;
3695
}
3696
3697
/* revocation, grant, or no-op? */
3698
if (cap->issued & ~newcaps) {
3699
int revoking = cap->issued & ~newcaps;
3700
3701
doutc(cl, "revocation: %s -> %s (revoking %s)\n",
3702
ceph_cap_string(cap->issued), ceph_cap_string(newcaps),
3703
ceph_cap_string(revoking));
3704
if (S_ISREG(inode->i_mode) &&
3705
(revoking & used & CEPH_CAP_FILE_BUFFER)) {
3706
writeback = true; /* initiate writeback; will delay ack */
3707
revoke_wait = true;
3708
} else if (queue_invalidate &&
3709
revoking == CEPH_CAP_FILE_CACHE &&
3710
(newcaps & CEPH_CAP_FILE_LAZYIO) == 0) {
3711
revoke_wait = true; /* do nothing yet, invalidation will be queued */
3712
} else if (cap == ci->i_auth_cap) {
3713
check_caps = 1; /* check auth cap only */
3714
} else {
3715
check_caps = 2; /* check all caps */
3716
}
3717
/* If there is new caps, try to wake up the waiters */
3718
if (~cap->issued & newcaps)
3719
wake = true;
3720
cap->issued = newcaps;
3721
cap->implemented |= newcaps;
3722
} else if (cap->issued == newcaps) {
3723
doutc(cl, "caps unchanged: %s -> %s\n",
3724
ceph_cap_string(cap->issued),
3725
ceph_cap_string(newcaps));
3726
} else {
3727
doutc(cl, "grant: %s -> %s\n", ceph_cap_string(cap->issued),
3728
ceph_cap_string(newcaps));
3729
/* non-auth MDS is revoking the newly grant caps ? */
3730
if (cap == ci->i_auth_cap &&
3731
__ceph_caps_revoking_other(ci, cap, newcaps))
3732
check_caps = 2;
3733
3734
cap->issued = newcaps;
3735
cap->implemented |= newcaps; /* add bits only, to
3736
* avoid stepping on a
3737
* pending revocation */
3738
wake = true;
3739
}
3740
BUG_ON(cap->issued & ~cap->implemented);
3741
3742
/* don't let check_caps skip sending a response to MDS for revoke msgs */
3743
if (!revoke_wait && le32_to_cpu(grant->op) == CEPH_CAP_OP_REVOKE) {
3744
cap->mds_wanted = 0;
3745
flags |= CHECK_CAPS_FLUSH_FORCE;
3746
if (cap == ci->i_auth_cap)
3747
check_caps = 1; /* check auth cap only */
3748
else
3749
check_caps = 2; /* check all caps */
3750
}
3751
3752
if (extra_info->inline_version > 0 &&
3753
extra_info->inline_version >= ci->i_inline_version) {
3754
ci->i_inline_version = extra_info->inline_version;
3755
if (ci->i_inline_version != CEPH_INLINE_NONE &&
3756
(newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
3757
fill_inline = true;
3758
}
3759
3760
if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
3761
if (ci->i_auth_cap == cap) {
3762
if (newcaps & ~extra_info->issued)
3763
wake = true;
3764
3765
if (ci->i_requested_max_size > max_size ||
3766
!(le32_to_cpu(grant->wanted) & CEPH_CAP_ANY_FILE_WR)) {
3767
/* re-request max_size if necessary */
3768
ci->i_requested_max_size = 0;
3769
wake = true;
3770
}
3771
3772
ceph_kick_flushing_inode_caps(session, ci);
3773
}
3774
up_read(&session->s_mdsc->snap_rwsem);
3775
}
3776
spin_unlock(&ci->i_ceph_lock);
3777
3778
if (fill_inline)
3779
ceph_fill_inline_data(inode, NULL, extra_info->inline_data,
3780
extra_info->inline_len);
3781
3782
if (queue_trunc)
3783
ceph_queue_vmtruncate(inode);
3784
3785
if (writeback)
3786
/*
3787
* queue inode for writeback: we can't actually call
3788
* filemap_write_and_wait, etc. from message handler
3789
* context.
3790
*/
3791
ceph_queue_writeback(inode);
3792
if (queue_invalidate)
3793
ceph_queue_invalidate(inode);
3794
if (deleted_inode)
3795
invalidate_aliases(inode);
3796
if (wake)
3797
wake_up_all(&ci->i_cap_wq);
3798
3799
mutex_unlock(&session->s_mutex);
3800
if (check_caps == 1)
3801
ceph_check_caps(ci, flags | CHECK_CAPS_AUTHONLY | CHECK_CAPS_NOINVAL);
3802
else if (check_caps == 2)
3803
ceph_check_caps(ci, flags | CHECK_CAPS_NOINVAL);
3804
}
3805
3806
/*
3807
* Handle FLUSH_ACK from MDS, indicating that metadata we sent to the
3808
* MDS has been safely committed.
3809
*/
3810
static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
3811
struct ceph_mds_caps *m,
3812
struct ceph_mds_session *session,
3813
struct ceph_cap *cap)
3814
__releases(ci->i_ceph_lock)
3815
{
3816
struct ceph_inode_info *ci = ceph_inode(inode);
3817
struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(inode->i_sb)->mdsc;
3818
struct ceph_client *cl = mdsc->fsc->client;
3819
struct ceph_cap_flush *cf, *tmp_cf;
3820
LIST_HEAD(to_remove);
3821
unsigned seq = le32_to_cpu(m->seq);
3822
int dirty = le32_to_cpu(m->dirty);
3823
int cleaned = 0;
3824
bool drop = false;
3825
bool wake_ci = false;
3826
bool wake_mdsc = false;
3827
3828
list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) {
3829
/* Is this the one that was flushed? */
3830
if (cf->tid == flush_tid)
3831
cleaned = cf->caps;
3832
3833
/* Is this a capsnap? */
3834
if (cf->is_capsnap)
3835
continue;
3836
3837
if (cf->tid <= flush_tid) {
3838
/*
3839
* An earlier or current tid. The FLUSH_ACK should
3840
* represent a superset of this flush's caps.
3841
*/
3842
wake_ci |= __detach_cap_flush_from_ci(ci, cf);
3843
list_add_tail(&cf->i_list, &to_remove);
3844
} else {
3845
/*
3846
* This is a later one. Any caps in it are still dirty
3847
* so don't count them as cleaned.
3848
*/
3849
cleaned &= ~cf->caps;
3850
if (!cleaned)
3851
break;
3852
}
3853
}
3854
3855
doutc(cl, "%p %llx.%llx mds%d seq %d on %s cleaned %s, flushing %s -> %s\n",
3856
inode, ceph_vinop(inode), session->s_mds, seq,
3857
ceph_cap_string(dirty), ceph_cap_string(cleaned),
3858
ceph_cap_string(ci->i_flushing_caps),
3859
ceph_cap_string(ci->i_flushing_caps & ~cleaned));
3860
3861
if (list_empty(&to_remove) && !cleaned)
3862
goto out;
3863
3864
ci->i_flushing_caps &= ~cleaned;
3865
3866
spin_lock(&mdsc->cap_dirty_lock);
3867
3868
list_for_each_entry(cf, &to_remove, i_list)
3869
wake_mdsc |= __detach_cap_flush_from_mdsc(mdsc, cf);
3870
3871
if (ci->i_flushing_caps == 0) {
3872
if (list_empty(&ci->i_cap_flush_list)) {
3873
list_del_init(&ci->i_flushing_item);
3874
if (!list_empty(&session->s_cap_flushing)) {
3875
struct inode *inode =
3876
&list_first_entry(&session->s_cap_flushing,
3877
struct ceph_inode_info,
3878
i_flushing_item)->netfs.inode;
3879
doutc(cl, " mds%d still flushing cap on %p %llx.%llx\n",
3880
session->s_mds, inode, ceph_vinop(inode));
3881
}
3882
}
3883
mdsc->num_cap_flushing--;
3884
doutc(cl, " %p %llx.%llx now !flushing\n", inode,
3885
ceph_vinop(inode));
3886
3887
if (ci->i_dirty_caps == 0) {
3888
doutc(cl, " %p %llx.%llx now clean\n", inode,
3889
ceph_vinop(inode));
3890
BUG_ON(!list_empty(&ci->i_dirty_item));
3891
drop = true;
3892
if (ci->i_wr_ref == 0 &&
3893
ci->i_wrbuffer_ref_head == 0) {
3894
BUG_ON(!ci->i_head_snapc);
3895
ceph_put_snap_context(ci->i_head_snapc);
3896
ci->i_head_snapc = NULL;
3897
}
3898
} else {
3899
BUG_ON(list_empty(&ci->i_dirty_item));
3900
}
3901
}
3902
spin_unlock(&mdsc->cap_dirty_lock);
3903
3904
out:
3905
spin_unlock(&ci->i_ceph_lock);
3906
3907
while (!list_empty(&to_remove)) {
3908
cf = list_first_entry(&to_remove,
3909
struct ceph_cap_flush, i_list);
3910
list_del_init(&cf->i_list);
3911
if (!cf->is_capsnap)
3912
ceph_free_cap_flush(cf);
3913
}
3914
3915
if (wake_ci)
3916
wake_up_all(&ci->i_cap_wq);
3917
if (wake_mdsc)
3918
wake_up_all(&mdsc->cap_flushing_wq);
3919
if (drop)
3920
iput(inode);
3921
}
3922
3923
void __ceph_remove_capsnap(struct inode *inode, struct ceph_cap_snap *capsnap,
3924
bool *wake_ci, bool *wake_mdsc)
3925
{
3926
struct ceph_inode_info *ci = ceph_inode(inode);
3927
struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(inode->i_sb)->mdsc;
3928
struct ceph_client *cl = mdsc->fsc->client;
3929
bool ret;
3930
3931
lockdep_assert_held(&ci->i_ceph_lock);
3932
3933
doutc(cl, "removing capsnap %p, %p %llx.%llx ci %p\n", capsnap,
3934
inode, ceph_vinop(inode), ci);
3935
3936
list_del_init(&capsnap->ci_item);
3937
ret = __detach_cap_flush_from_ci(ci, &capsnap->cap_flush);
3938
if (wake_ci)
3939
*wake_ci = ret;
3940
3941
spin_lock(&mdsc->cap_dirty_lock);
3942
if (list_empty(&ci->i_cap_flush_list))
3943
list_del_init(&ci->i_flushing_item);
3944
3945
ret = __detach_cap_flush_from_mdsc(mdsc, &capsnap->cap_flush);
3946
if (wake_mdsc)
3947
*wake_mdsc = ret;
3948
spin_unlock(&mdsc->cap_dirty_lock);
3949
}
3950
3951
void ceph_remove_capsnap(struct inode *inode, struct ceph_cap_snap *capsnap,
3952
bool *wake_ci, bool *wake_mdsc)
3953
{
3954
struct ceph_inode_info *ci = ceph_inode(inode);
3955
3956
lockdep_assert_held(&ci->i_ceph_lock);
3957
3958
WARN_ON_ONCE(capsnap->dirty_pages || capsnap->writing);
3959
__ceph_remove_capsnap(inode, capsnap, wake_ci, wake_mdsc);
3960
}
3961
3962
/*
3963
* Handle FLUSHSNAP_ACK. MDS has flushed snap data to disk and we can
3964
* throw away our cap_snap.
3965
*
3966
* Caller hold s_mutex.
3967
*/
3968
static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
3969
struct ceph_mds_caps *m,
3970
struct ceph_mds_session *session)
3971
{
3972
struct ceph_inode_info *ci = ceph_inode(inode);
3973
struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(inode->i_sb)->mdsc;
3974
struct ceph_client *cl = mdsc->fsc->client;
3975
u64 follows = le64_to_cpu(m->snap_follows);
3976
struct ceph_cap_snap *capsnap = NULL, *iter;
3977
bool wake_ci = false;
3978
bool wake_mdsc = false;
3979
3980
doutc(cl, "%p %llx.%llx ci %p mds%d follows %lld\n", inode,
3981
ceph_vinop(inode), ci, session->s_mds, follows);
3982
3983
spin_lock(&ci->i_ceph_lock);
3984
list_for_each_entry(iter, &ci->i_cap_snaps, ci_item) {
3985
if (iter->follows == follows) {
3986
if (iter->cap_flush.tid != flush_tid) {
3987
doutc(cl, " cap_snap %p follows %lld "
3988
"tid %lld != %lld\n", iter,
3989
follows, flush_tid,
3990
iter->cap_flush.tid);
3991
break;
3992
}
3993
capsnap = iter;
3994
break;
3995
} else {
3996
doutc(cl, " skipping cap_snap %p follows %lld\n",
3997
iter, iter->follows);
3998
}
3999
}
4000
if (capsnap)
4001
ceph_remove_capsnap(inode, capsnap, &wake_ci, &wake_mdsc);
4002
spin_unlock(&ci->i_ceph_lock);
4003
4004
if (capsnap) {
4005
ceph_put_snap_context(capsnap->context);
4006
ceph_put_cap_snap(capsnap);
4007
if (wake_ci)
4008
wake_up_all(&ci->i_cap_wq);
4009
if (wake_mdsc)
4010
wake_up_all(&mdsc->cap_flushing_wq);
4011
iput(inode);
4012
}
4013
}
4014
4015
/*
4016
* Handle TRUNC from MDS, indicating file truncation.
4017
*
4018
* caller hold s_mutex.
4019
*/
4020
static bool handle_cap_trunc(struct inode *inode,
4021
struct ceph_mds_caps *trunc,
4022
struct ceph_mds_session *session,
4023
struct cap_extra_info *extra_info)
4024
{
4025
struct ceph_inode_info *ci = ceph_inode(inode);
4026
struct ceph_client *cl = ceph_inode_to_client(inode);
4027
int mds = session->s_mds;
4028
int seq = le32_to_cpu(trunc->seq);
4029
u32 truncate_seq = le32_to_cpu(trunc->truncate_seq);
4030
u64 truncate_size = le64_to_cpu(trunc->truncate_size);
4031
u64 size = le64_to_cpu(trunc->size);
4032
int implemented = 0;
4033
int dirty = __ceph_caps_dirty(ci);
4034
int issued = __ceph_caps_issued(ceph_inode(inode), &implemented);
4035
bool queue_trunc = false;
4036
4037
lockdep_assert_held(&ci->i_ceph_lock);
4038
4039
issued |= implemented | dirty;
4040
4041
/*
4042
* If there is at least one crypto block then we'll trust
4043
* fscrypt_file_size. If the real length of the file is 0, then
4044
* ignore it (it has probably been truncated down to 0 by the MDS).
4045
*/
4046
if (IS_ENCRYPTED(inode) && size)
4047
size = extra_info->fscrypt_file_size;
4048
4049
doutc(cl, "%p %llx.%llx mds%d seq %d to %lld truncate seq %d\n",
4050
inode, ceph_vinop(inode), mds, seq, truncate_size, truncate_seq);
4051
queue_trunc = ceph_fill_file_size(inode, issued,
4052
truncate_seq, truncate_size, size);
4053
return queue_trunc;
4054
}
4055
4056
/*
4057
* Handle EXPORT from MDS. Cap is being migrated _from_ this mds to a
4058
* different one. If we are the most recent migration we've seen (as
4059
* indicated by mseq), make note of the migrating cap bits for the
4060
* duration (until we see the corresponding IMPORT).
4061
*
4062
* caller holds s_mutex
4063
*/
4064
static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
4065
struct ceph_mds_cap_peer *ph,
4066
struct ceph_mds_session *session)
4067
{
4068
struct ceph_mds_client *mdsc = ceph_inode_to_fs_client(inode)->mdsc;
4069
struct ceph_client *cl = mdsc->fsc->client;
4070
struct ceph_mds_session *tsession = NULL;
4071
struct ceph_cap *cap, *tcap, *new_cap = NULL;
4072
struct ceph_inode_info *ci = ceph_inode(inode);
4073
u64 t_cap_id;
4074
u32 t_issue_seq, t_mseq;
4075
int target, issued;
4076
int mds = session->s_mds;
4077
4078
if (ph) {
4079
t_cap_id = le64_to_cpu(ph->cap_id);
4080
t_issue_seq = le32_to_cpu(ph->issue_seq);
4081
t_mseq = le32_to_cpu(ph->mseq);
4082
target = le32_to_cpu(ph->mds);
4083
} else {
4084
t_cap_id = t_issue_seq = t_mseq = 0;
4085
target = -1;
4086
}
4087
4088
doutc(cl, " cap %llx.%llx export to peer %d piseq %u pmseq %u\n",
4089
ceph_vinop(inode), target, t_issue_seq, t_mseq);
4090
retry:
4091
down_read(&mdsc->snap_rwsem);
4092
spin_lock(&ci->i_ceph_lock);
4093
cap = __get_cap_for_mds(ci, mds);
4094
if (!cap || cap->cap_id != le64_to_cpu(ex->cap_id))
4095
goto out_unlock;
4096
4097
if (target < 0) {
4098
ceph_remove_cap(mdsc, cap, false);
4099
goto out_unlock;
4100
}
4101
4102
/*
4103
* now we know we haven't received the cap import message yet
4104
* because the exported cap still exist.
4105
*/
4106
4107
issued = cap->issued;
4108
if (issued != cap->implemented)
4109
pr_err_ratelimited_client(cl, "issued != implemented: "
4110
"%p %llx.%llx mds%d seq %d mseq %d"
4111
" issued %s implemented %s\n",
4112
inode, ceph_vinop(inode), mds,
4113
cap->seq, cap->mseq,
4114
ceph_cap_string(issued),
4115
ceph_cap_string(cap->implemented));
4116
4117
4118
tcap = __get_cap_for_mds(ci, target);
4119
if (tcap) {
4120
/* already have caps from the target */
4121
if (tcap->cap_id == t_cap_id &&
4122
ceph_seq_cmp(tcap->seq, t_issue_seq) < 0) {
4123
doutc(cl, " updating import cap %p mds%d\n", tcap,
4124
target);
4125
tcap->cap_id = t_cap_id;
4126
tcap->seq = t_issue_seq - 1;
4127
tcap->issue_seq = t_issue_seq - 1;
4128
tcap->issued |= issued;
4129
tcap->implemented |= issued;
4130
if (cap == ci->i_auth_cap) {
4131
ci->i_auth_cap = tcap;
4132
change_auth_cap_ses(ci, tcap->session);
4133
}
4134
}
4135
ceph_remove_cap(mdsc, cap, false);
4136
goto out_unlock;
4137
} else if (tsession) {
4138
/* add placeholder for the export target */
4139
int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
4140
tcap = new_cap;
4141
ceph_add_cap(inode, tsession, t_cap_id, issued, 0,
4142
t_issue_seq - 1, t_mseq, (u64)-1, flag, &new_cap);
4143
4144
if (!list_empty(&ci->i_cap_flush_list) &&
4145
ci->i_auth_cap == tcap) {
4146
spin_lock(&mdsc->cap_dirty_lock);
4147
list_move_tail(&ci->i_flushing_item,
4148
&tcap->session->s_cap_flushing);
4149
spin_unlock(&mdsc->cap_dirty_lock);
4150
}
4151
4152
ceph_remove_cap(mdsc, cap, false);
4153
goto out_unlock;
4154
}
4155
4156
spin_unlock(&ci->i_ceph_lock);
4157
up_read(&mdsc->snap_rwsem);
4158
mutex_unlock(&session->s_mutex);
4159
4160
/* open target session */
4161
tsession = ceph_mdsc_open_export_target_session(mdsc, target);
4162
if (!IS_ERR(tsession)) {
4163
if (mds > target) {
4164
mutex_lock(&session->s_mutex);
4165
mutex_lock_nested(&tsession->s_mutex,
4166
SINGLE_DEPTH_NESTING);
4167
} else {
4168
mutex_lock(&tsession->s_mutex);
4169
mutex_lock_nested(&session->s_mutex,
4170
SINGLE_DEPTH_NESTING);
4171
}
4172
new_cap = ceph_get_cap(mdsc, NULL);
4173
} else {
4174
WARN_ON(1);
4175
tsession = NULL;
4176
target = -1;
4177
mutex_lock(&session->s_mutex);
4178
}
4179
goto retry;
4180
4181
out_unlock:
4182
spin_unlock(&ci->i_ceph_lock);
4183
up_read(&mdsc->snap_rwsem);
4184
mutex_unlock(&session->s_mutex);
4185
if (tsession) {
4186
mutex_unlock(&tsession->s_mutex);
4187
ceph_put_mds_session(tsession);
4188
}
4189
if (new_cap)
4190
ceph_put_cap(mdsc, new_cap);
4191
}
4192
4193
/*
4194
* Handle cap IMPORT.
4195
*
4196
* caller holds s_mutex. acquires i_ceph_lock
4197
*/
4198
static void handle_cap_import(struct ceph_mds_client *mdsc,
4199
struct inode *inode, struct ceph_mds_caps *im,
4200
struct ceph_mds_cap_peer *ph,
4201
struct ceph_mds_session *session,
4202
struct ceph_cap **target_cap, int *old_issued)
4203
{
4204
struct ceph_inode_info *ci = ceph_inode(inode);
4205
struct ceph_client *cl = mdsc->fsc->client;
4206
struct ceph_cap *cap, *ocap, *new_cap = NULL;
4207
int mds = session->s_mds;
4208
int issued;
4209
unsigned caps = le32_to_cpu(im->caps);
4210
unsigned wanted = le32_to_cpu(im->wanted);
4211
unsigned seq = le32_to_cpu(im->seq);
4212
unsigned mseq = le32_to_cpu(im->migrate_seq);
4213
u64 realmino = le64_to_cpu(im->realm);
4214
u64 cap_id = le64_to_cpu(im->cap_id);
4215
u64 p_cap_id;
4216
u32 piseq = 0;
4217
u32 pmseq = 0;
4218
int peer;
4219
4220
if (ph) {
4221
p_cap_id = le64_to_cpu(ph->cap_id);
4222
peer = le32_to_cpu(ph->mds);
4223
piseq = le32_to_cpu(ph->issue_seq);
4224
pmseq = le32_to_cpu(ph->mseq);
4225
} else {
4226
p_cap_id = 0;
4227
peer = -1;
4228
}
4229
4230
doutc(cl, " cap %llx.%llx import from peer %d piseq %u pmseq %u\n",
4231
ceph_vinop(inode), peer, piseq, pmseq);
4232
retry:
4233
cap = __get_cap_for_mds(ci, mds);
4234
if (!cap) {
4235
if (!new_cap) {
4236
spin_unlock(&ci->i_ceph_lock);
4237
new_cap = ceph_get_cap(mdsc, NULL);
4238
spin_lock(&ci->i_ceph_lock);
4239
goto retry;
4240
}
4241
cap = new_cap;
4242
} else {
4243
if (new_cap) {
4244
ceph_put_cap(mdsc, new_cap);
4245
new_cap = NULL;
4246
}
4247
}
4248
4249
__ceph_caps_issued(ci, &issued);
4250
issued |= __ceph_caps_dirty(ci);
4251
4252
ceph_add_cap(inode, session, cap_id, caps, wanted, seq, mseq,
4253
realmino, CEPH_CAP_FLAG_AUTH, &new_cap);
4254
4255
ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
4256
if (ocap && ocap->cap_id == p_cap_id) {
4257
doutc(cl, " remove export cap %p mds%d flags %d\n",
4258
ocap, peer, ph->flags);
4259
if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
4260
(ocap->seq != piseq ||
4261
ocap->mseq != pmseq)) {
4262
pr_err_ratelimited_client(cl, "mismatched seq/mseq: "
4263
"%p %llx.%llx mds%d seq %d mseq %d"
4264
" importer mds%d has peer seq %d mseq %d\n",
4265
inode, ceph_vinop(inode), peer,
4266
ocap->seq, ocap->mseq, mds, piseq, pmseq);
4267
}
4268
ceph_remove_cap(mdsc, ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
4269
}
4270
4271
*old_issued = issued;
4272
*target_cap = cap;
4273
}
4274
4275
#ifdef CONFIG_FS_ENCRYPTION
4276
static int parse_fscrypt_fields(void **p, void *end,
4277
struct cap_extra_info *extra)
4278
{
4279
u32 len;
4280
4281
ceph_decode_32_safe(p, end, extra->fscrypt_auth_len, bad);
4282
if (extra->fscrypt_auth_len) {
4283
ceph_decode_need(p, end, extra->fscrypt_auth_len, bad);
4284
extra->fscrypt_auth = kmalloc(extra->fscrypt_auth_len,
4285
GFP_KERNEL);
4286
if (!extra->fscrypt_auth)
4287
return -ENOMEM;
4288
ceph_decode_copy_safe(p, end, extra->fscrypt_auth,
4289
extra->fscrypt_auth_len, bad);
4290
}
4291
4292
ceph_decode_32_safe(p, end, len, bad);
4293
if (len >= sizeof(u64)) {
4294
ceph_decode_64_safe(p, end, extra->fscrypt_file_size, bad);
4295
len -= sizeof(u64);
4296
}
4297
ceph_decode_skip_n(p, end, len, bad);
4298
return 0;
4299
bad:
4300
return -EIO;
4301
}
4302
#else
4303
static int parse_fscrypt_fields(void **p, void *end,
4304
struct cap_extra_info *extra)
4305
{
4306
u32 len;
4307
4308
/* Don't care about these fields unless we're encryption-capable */
4309
ceph_decode_32_safe(p, end, len, bad);
4310
if (len)
4311
ceph_decode_skip_n(p, end, len, bad);
4312
ceph_decode_32_safe(p, end, len, bad);
4313
if (len)
4314
ceph_decode_skip_n(p, end, len, bad);
4315
return 0;
4316
bad:
4317
return -EIO;
4318
}
4319
#endif
4320
4321
/*
4322
* Handle a caps message from the MDS.
4323
*
4324
* Identify the appropriate session, inode, and call the right handler
4325
* based on the cap op.
4326
*/
4327
void ceph_handle_caps(struct ceph_mds_session *session,
4328
struct ceph_msg *msg)
4329
{
4330
struct ceph_mds_client *mdsc = session->s_mdsc;
4331
struct ceph_client *cl = mdsc->fsc->client;
4332
struct inode *inode;
4333
struct ceph_inode_info *ci;
4334
struct ceph_cap *cap;
4335
struct ceph_mds_caps *h;
4336
struct ceph_mds_cap_peer *peer = NULL;
4337
struct ceph_snap_realm *realm = NULL;
4338
int op;
4339
int msg_version = le16_to_cpu(msg->hdr.version);
4340
u32 seq, mseq, issue_seq;
4341
struct ceph_vino vino;
4342
void *snaptrace;
4343
size_t snaptrace_len;
4344
void *p, *end;
4345
struct cap_extra_info extra_info = {};
4346
bool queue_trunc;
4347
bool close_sessions = false;
4348
bool do_cap_release = false;
4349
4350
if (!ceph_inc_mds_stopping_blocker(mdsc, session))
4351
return;
4352
4353
/* decode */
4354
end = msg->front.iov_base + msg->front.iov_len;
4355
if (msg->front.iov_len < sizeof(*h))
4356
goto bad;
4357
h = msg->front.iov_base;
4358
op = le32_to_cpu(h->op);
4359
vino.ino = le64_to_cpu(h->ino);
4360
vino.snap = CEPH_NOSNAP;
4361
seq = le32_to_cpu(h->seq);
4362
mseq = le32_to_cpu(h->migrate_seq);
4363
issue_seq = le32_to_cpu(h->issue_seq);
4364
4365
snaptrace = h + 1;
4366
snaptrace_len = le32_to_cpu(h->snap_trace_len);
4367
p = snaptrace + snaptrace_len;
4368
4369
if (msg_version >= 2) {
4370
u32 flock_len;
4371
ceph_decode_32_safe(&p, end, flock_len, bad);
4372
if (p + flock_len > end)
4373
goto bad;
4374
p += flock_len;
4375
}
4376
4377
if (msg_version >= 3) {
4378
if (op == CEPH_CAP_OP_IMPORT) {
4379
if (p + sizeof(*peer) > end)
4380
goto bad;
4381
peer = p;
4382
p += sizeof(*peer);
4383
} else if (op == CEPH_CAP_OP_EXPORT) {
4384
/* recorded in unused fields */
4385
peer = (void *)&h->size;
4386
}
4387
}
4388
4389
if (msg_version >= 4) {
4390
ceph_decode_64_safe(&p, end, extra_info.inline_version, bad);
4391
ceph_decode_32_safe(&p, end, extra_info.inline_len, bad);
4392
if (p + extra_info.inline_len > end)
4393
goto bad;
4394
extra_info.inline_data = p;
4395
p += extra_info.inline_len;
4396
}
4397
4398
if (msg_version >= 5) {
4399
struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
4400
u32 epoch_barrier;
4401
4402
ceph_decode_32_safe(&p, end, epoch_barrier, bad);
4403
ceph_osdc_update_epoch_barrier(osdc, epoch_barrier);
4404
}
4405
4406
if (msg_version >= 8) {
4407
u32 pool_ns_len;
4408
4409
/* version >= 6 */
4410
ceph_decode_skip_64(&p, end, bad); // flush_tid
4411
/* version >= 7 */
4412
ceph_decode_skip_32(&p, end, bad); // caller_uid
4413
ceph_decode_skip_32(&p, end, bad); // caller_gid
4414
/* version >= 8 */
4415
ceph_decode_32_safe(&p, end, pool_ns_len, bad);
4416
if (pool_ns_len > 0) {
4417
ceph_decode_need(&p, end, pool_ns_len, bad);
4418
extra_info.pool_ns =
4419
ceph_find_or_create_string(p, pool_ns_len);
4420
p += pool_ns_len;
4421
}
4422
}
4423
4424
if (msg_version >= 9) {
4425
struct ceph_timespec *btime;
4426
4427
if (p + sizeof(*btime) > end)
4428
goto bad;
4429
btime = p;
4430
ceph_decode_timespec64(&extra_info.btime, btime);
4431
p += sizeof(*btime);
4432
ceph_decode_64_safe(&p, end, extra_info.change_attr, bad);
4433
}
4434
4435
if (msg_version >= 11) {
4436
/* version >= 10 */
4437
ceph_decode_skip_32(&p, end, bad); // flags
4438
/* version >= 11 */
4439
extra_info.dirstat_valid = true;
4440
ceph_decode_64_safe(&p, end, extra_info.nfiles, bad);
4441
ceph_decode_64_safe(&p, end, extra_info.nsubdirs, bad);
4442
}
4443
4444
if (msg_version >= 12) {
4445
if (parse_fscrypt_fields(&p, end, &extra_info))
4446
goto bad;
4447
}
4448
4449
/* lookup ino */
4450
inode = ceph_find_inode(mdsc->fsc->sb, vino);
4451
doutc(cl, " caps mds%d op %s ino %llx.%llx inode %p seq %u iseq %u mseq %u\n",
4452
session->s_mds, ceph_cap_op_name(op), vino.ino, vino.snap, inode,
4453
seq, issue_seq, mseq);
4454
4455
mutex_lock(&session->s_mutex);
4456
4457
if (!inode) {
4458
doutc(cl, " i don't have ino %llx\n", vino.ino);
4459
4460
switch (op) {
4461
case CEPH_CAP_OP_IMPORT:
4462
case CEPH_CAP_OP_REVOKE:
4463
case CEPH_CAP_OP_GRANT:
4464
do_cap_release = true;
4465
break;
4466
default:
4467
break;
4468
}
4469
goto flush_cap_releases;
4470
}
4471
ci = ceph_inode(inode);
4472
4473
/* these will work even if we don't have a cap yet */
4474
switch (op) {
4475
case CEPH_CAP_OP_FLUSHSNAP_ACK:
4476
handle_cap_flushsnap_ack(inode, le64_to_cpu(msg->hdr.tid),
4477
h, session);
4478
goto done;
4479
4480
case CEPH_CAP_OP_EXPORT:
4481
handle_cap_export(inode, h, peer, session);
4482
goto done_unlocked;
4483
4484
case CEPH_CAP_OP_IMPORT:
4485
realm = NULL;
4486
if (snaptrace_len) {
4487
down_write(&mdsc->snap_rwsem);
4488
if (ceph_update_snap_trace(mdsc, snaptrace,
4489
snaptrace + snaptrace_len,
4490
false, &realm)) {
4491
up_write(&mdsc->snap_rwsem);
4492
close_sessions = true;
4493
goto done;
4494
}
4495
downgrade_write(&mdsc->snap_rwsem);
4496
} else {
4497
down_read(&mdsc->snap_rwsem);
4498
}
4499
spin_lock(&ci->i_ceph_lock);
4500
handle_cap_import(mdsc, inode, h, peer, session,
4501
&cap, &extra_info.issued);
4502
handle_cap_grant(inode, session, cap,
4503
h, msg->middle, &extra_info);
4504
if (realm)
4505
ceph_put_snap_realm(mdsc, realm);
4506
goto done_unlocked;
4507
}
4508
4509
/* the rest require a cap */
4510
spin_lock(&ci->i_ceph_lock);
4511
cap = __get_cap_for_mds(ceph_inode(inode), session->s_mds);
4512
if (!cap) {
4513
doutc(cl, " no cap on %p ino %llx.%llx from mds%d\n",
4514
inode, ceph_ino(inode), ceph_snap(inode),
4515
session->s_mds);
4516
spin_unlock(&ci->i_ceph_lock);
4517
switch (op) {
4518
case CEPH_CAP_OP_REVOKE:
4519
case CEPH_CAP_OP_GRANT:
4520
do_cap_release = true;
4521
break;
4522
default:
4523
break;
4524
}
4525
goto flush_cap_releases;
4526
}
4527
4528
/* note that each of these drops i_ceph_lock for us */
4529
switch (op) {
4530
case CEPH_CAP_OP_REVOKE:
4531
case CEPH_CAP_OP_GRANT:
4532
__ceph_caps_issued(ci, &extra_info.issued);
4533
extra_info.issued |= __ceph_caps_dirty(ci);
4534
handle_cap_grant(inode, session, cap,
4535
h, msg->middle, &extra_info);
4536
goto done_unlocked;
4537
4538
case CEPH_CAP_OP_FLUSH_ACK:
4539
handle_cap_flush_ack(inode, le64_to_cpu(msg->hdr.tid),
4540
h, session, cap);
4541
break;
4542
4543
case CEPH_CAP_OP_TRUNC:
4544
queue_trunc = handle_cap_trunc(inode, h, session,
4545
&extra_info);
4546
spin_unlock(&ci->i_ceph_lock);
4547
if (queue_trunc)
4548
ceph_queue_vmtruncate(inode);
4549
break;
4550
4551
default:
4552
spin_unlock(&ci->i_ceph_lock);
4553
pr_err_client(cl, "unknown cap op %d %s\n", op,
4554
ceph_cap_op_name(op));
4555
}
4556
4557
done:
4558
mutex_unlock(&session->s_mutex);
4559
done_unlocked:
4560
iput(inode);
4561
out:
4562
ceph_dec_mds_stopping_blocker(mdsc);
4563
4564
ceph_put_string(extra_info.pool_ns);
4565
4566
/* Defer closing the sessions after s_mutex lock being released */
4567
if (close_sessions)
4568
ceph_mdsc_close_sessions(mdsc);
4569
4570
kfree(extra_info.fscrypt_auth);
4571
return;
4572
4573
flush_cap_releases:
4574
/*
4575
* send any cap release message to try to move things
4576
* along for the mds (who clearly thinks we still have this
4577
* cap).
4578
*/
4579
if (do_cap_release) {
4580
cap = ceph_get_cap(mdsc, NULL);
4581
cap->cap_ino = vino.ino;
4582
cap->queue_release = 1;
4583
cap->cap_id = le64_to_cpu(h->cap_id);
4584
cap->mseq = mseq;
4585
cap->seq = seq;
4586
cap->issue_seq = seq;
4587
spin_lock(&session->s_cap_lock);
4588
__ceph_queue_cap_release(session, cap);
4589
spin_unlock(&session->s_cap_lock);
4590
}
4591
ceph_flush_session_cap_releases(mdsc, session);
4592
goto done;
4593
4594
bad:
4595
pr_err_client(cl, "corrupt message\n");
4596
ceph_msg_dump(msg);
4597
goto out;
4598
}
4599
4600
/*
4601
* Delayed work handler to process end of delayed cap release LRU list.
4602
*
4603
* If new caps are added to the list while processing it, these won't get
4604
* processed in this run. In this case, the ci->i_hold_caps_max will be
4605
* returned so that the work can be scheduled accordingly.
4606
*/
4607
unsigned long ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
4608
{
4609
struct ceph_client *cl = mdsc->fsc->client;
4610
struct inode *inode;
4611
struct ceph_inode_info *ci;
4612
struct ceph_mount_options *opt = mdsc->fsc->mount_options;
4613
unsigned long delay_max = opt->caps_wanted_delay_max * HZ;
4614
unsigned long loop_start = jiffies;
4615
unsigned long delay = 0;
4616
4617
doutc(cl, "begin\n");
4618
spin_lock(&mdsc->cap_delay_lock);
4619
while (!list_empty(&mdsc->cap_delay_list)) {
4620
ci = list_first_entry(&mdsc->cap_delay_list,
4621
struct ceph_inode_info,
4622
i_cap_delay_list);
4623
if (time_before(loop_start, ci->i_hold_caps_max - delay_max)) {
4624
doutc(cl, "caps added recently. Exiting loop");
4625
delay = ci->i_hold_caps_max;
4626
break;
4627
}
4628
if ((ci->i_ceph_flags & CEPH_I_FLUSH) == 0 &&
4629
time_before(jiffies, ci->i_hold_caps_max))
4630
break;
4631
list_del_init(&ci->i_cap_delay_list);
4632
4633
inode = igrab(&ci->netfs.inode);
4634
if (inode) {
4635
spin_unlock(&mdsc->cap_delay_lock);
4636
doutc(cl, "on %p %llx.%llx\n", inode,
4637
ceph_vinop(inode));
4638
ceph_check_caps(ci, 0);
4639
iput(inode);
4640
spin_lock(&mdsc->cap_delay_lock);
4641
}
4642
4643
/*
4644
* Make sure too many dirty caps or general
4645
* slowness doesn't block mdsc delayed work,
4646
* preventing send_renew_caps() from running.
4647
*/
4648
if (time_after_eq(jiffies, loop_start + 5 * HZ))
4649
break;
4650
}
4651
spin_unlock(&mdsc->cap_delay_lock);
4652
doutc(cl, "done\n");
4653
4654
return delay;
4655
}
4656
4657
/*
4658
* Flush all dirty caps to the mds
4659
*/
4660
static void flush_dirty_session_caps(struct ceph_mds_session *s)
4661
{
4662
struct ceph_mds_client *mdsc = s->s_mdsc;
4663
struct ceph_client *cl = mdsc->fsc->client;
4664
struct ceph_inode_info *ci;
4665
struct inode *inode;
4666
4667
doutc(cl, "begin\n");
4668
spin_lock(&mdsc->cap_dirty_lock);
4669
while (!list_empty(&s->s_cap_dirty)) {
4670
ci = list_first_entry(&s->s_cap_dirty, struct ceph_inode_info,
4671
i_dirty_item);
4672
inode = &ci->netfs.inode;
4673
ihold(inode);
4674
doutc(cl, "%p %llx.%llx\n", inode, ceph_vinop(inode));
4675
spin_unlock(&mdsc->cap_dirty_lock);
4676
ceph_wait_on_async_create(inode);
4677
ceph_check_caps(ci, CHECK_CAPS_FLUSH);
4678
iput(inode);
4679
spin_lock(&mdsc->cap_dirty_lock);
4680
}
4681
spin_unlock(&mdsc->cap_dirty_lock);
4682
doutc(cl, "done\n");
4683
}
4684
4685
void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
4686
{
4687
ceph_mdsc_iterate_sessions(mdsc, flush_dirty_session_caps, true);
4688
}
4689
4690
/*
4691
* Flush all cap releases to the mds
4692
*/
4693
static void flush_cap_releases(struct ceph_mds_session *s)
4694
{
4695
struct ceph_mds_client *mdsc = s->s_mdsc;
4696
struct ceph_client *cl = mdsc->fsc->client;
4697
4698
doutc(cl, "begin\n");
4699
spin_lock(&s->s_cap_lock);
4700
if (s->s_num_cap_releases)
4701
ceph_flush_session_cap_releases(mdsc, s);
4702
spin_unlock(&s->s_cap_lock);
4703
doutc(cl, "done\n");
4704
4705
}
4706
4707
void ceph_flush_cap_releases(struct ceph_mds_client *mdsc)
4708
{
4709
ceph_mdsc_iterate_sessions(mdsc, flush_cap_releases, true);
4710
}
4711
4712
void __ceph_touch_fmode(struct ceph_inode_info *ci,
4713
struct ceph_mds_client *mdsc, int fmode)
4714
{
4715
unsigned long now = jiffies;
4716
if (fmode & CEPH_FILE_MODE_RD)
4717
ci->i_last_rd = now;
4718
if (fmode & CEPH_FILE_MODE_WR)
4719
ci->i_last_wr = now;
4720
/* queue periodic check */
4721
if (fmode &&
4722
__ceph_is_any_real_caps(ci) &&
4723
list_empty(&ci->i_cap_delay_list))
4724
__cap_delay_requeue(mdsc, ci);
4725
}
4726
4727
void ceph_get_fmode(struct ceph_inode_info *ci, int fmode, int count)
4728
{
4729
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(ci->netfs.inode.i_sb);
4730
int bits = (fmode << 1) | 1;
4731
bool already_opened = false;
4732
int i;
4733
4734
if (count == 1)
4735
atomic64_inc(&mdsc->metric.opened_files);
4736
4737
spin_lock(&ci->i_ceph_lock);
4738
for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
4739
/*
4740
* If any of the mode ref is larger than 0,
4741
* that means it has been already opened by
4742
* others. Just skip checking the PIN ref.
4743
*/
4744
if (i && ci->i_nr_by_mode[i])
4745
already_opened = true;
4746
4747
if (bits & (1 << i))
4748
ci->i_nr_by_mode[i] += count;
4749
}
4750
4751
if (!already_opened)
4752
percpu_counter_inc(&mdsc->metric.opened_inodes);
4753
spin_unlock(&ci->i_ceph_lock);
4754
}
4755
4756
/*
4757
* Drop open file reference. If we were the last open file,
4758
* we may need to release capabilities to the MDS (or schedule
4759
* their delayed release).
4760
*/
4761
void ceph_put_fmode(struct ceph_inode_info *ci, int fmode, int count)
4762
{
4763
struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(ci->netfs.inode.i_sb);
4764
int bits = (fmode << 1) | 1;
4765
bool is_closed = true;
4766
int i;
4767
4768
if (count == 1)
4769
atomic64_dec(&mdsc->metric.opened_files);
4770
4771
spin_lock(&ci->i_ceph_lock);
4772
for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
4773
if (bits & (1 << i)) {
4774
BUG_ON(ci->i_nr_by_mode[i] < count);
4775
ci->i_nr_by_mode[i] -= count;
4776
}
4777
4778
/*
4779
* If any of the mode ref is not 0 after
4780
* decreased, that means it is still opened
4781
* by others. Just skip checking the PIN ref.
4782
*/
4783
if (i && ci->i_nr_by_mode[i])
4784
is_closed = false;
4785
}
4786
4787
if (is_closed)
4788
percpu_counter_dec(&mdsc->metric.opened_inodes);
4789
spin_unlock(&ci->i_ceph_lock);
4790
}
4791
4792
/*
4793
* For a soon-to-be unlinked file, drop the LINK caps. If it
4794
* looks like the link count will hit 0, drop any other caps (other
4795
* than PIN) we don't specifically want (due to the file still being
4796
* open).
4797
*/
4798
int ceph_drop_caps_for_unlink(struct inode *inode)
4799
{
4800
struct ceph_inode_info *ci = ceph_inode(inode);
4801
int drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
4802
4803
spin_lock(&ci->i_ceph_lock);
4804
if (inode->i_nlink == 1) {
4805
drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN);
4806
4807
if (__ceph_caps_dirty(ci)) {
4808
struct ceph_mds_client *mdsc =
4809
ceph_inode_to_fs_client(inode)->mdsc;
4810
4811
doutc(mdsc->fsc->client, "%p %llx.%llx\n", inode,
4812
ceph_vinop(inode));
4813
spin_lock(&mdsc->cap_delay_lock);
4814
ci->i_ceph_flags |= CEPH_I_FLUSH;
4815
if (!list_empty(&ci->i_cap_delay_list))
4816
list_del_init(&ci->i_cap_delay_list);
4817
list_add_tail(&ci->i_cap_delay_list,
4818
&mdsc->cap_unlink_delay_list);
4819
spin_unlock(&mdsc->cap_delay_lock);
4820
4821
/*
4822
* Fire the work immediately, because the MDS maybe
4823
* waiting for caps release.
4824
*/
4825
ceph_queue_cap_unlink_work(mdsc);
4826
}
4827
}
4828
spin_unlock(&ci->i_ceph_lock);
4829
return drop;
4830
}
4831
4832
/*
4833
* Helpers for embedding cap and dentry lease releases into mds
4834
* requests.
4835
*
4836
* @force is used by dentry_release (below) to force inclusion of a
4837
* record for the directory inode, even when there aren't any caps to
4838
* drop.
4839
*/
4840
int ceph_encode_inode_release(void **p, struct inode *inode,
4841
int mds, int drop, int unless, int force)
4842
{
4843
struct ceph_inode_info *ci = ceph_inode(inode);
4844
struct ceph_client *cl = ceph_inode_to_client(inode);
4845
struct ceph_cap *cap;
4846
struct ceph_mds_request_release *rel = *p;
4847
int used, dirty;
4848
int ret = 0;
4849
4850
spin_lock(&ci->i_ceph_lock);
4851
used = __ceph_caps_used(ci);
4852
dirty = __ceph_caps_dirty(ci);
4853
4854
doutc(cl, "%p %llx.%llx mds%d used|dirty %s drop %s unless %s\n",
4855
inode, ceph_vinop(inode), mds, ceph_cap_string(used|dirty),
4856
ceph_cap_string(drop), ceph_cap_string(unless));
4857
4858
/* only drop unused, clean caps */
4859
drop &= ~(used | dirty);
4860
4861
cap = __get_cap_for_mds(ci, mds);
4862
if (cap && __cap_is_valid(cap)) {
4863
unless &= cap->issued;
4864
if (unless) {
4865
if (unless & CEPH_CAP_AUTH_EXCL)
4866
drop &= ~CEPH_CAP_AUTH_SHARED;
4867
if (unless & CEPH_CAP_LINK_EXCL)
4868
drop &= ~CEPH_CAP_LINK_SHARED;
4869
if (unless & CEPH_CAP_XATTR_EXCL)
4870
drop &= ~CEPH_CAP_XATTR_SHARED;
4871
if (unless & CEPH_CAP_FILE_EXCL)
4872
drop &= ~CEPH_CAP_FILE_SHARED;
4873
}
4874
4875
if (force || (cap->issued & drop)) {
4876
if (cap->issued & drop) {
4877
int wanted = __ceph_caps_wanted(ci);
4878
doutc(cl, "%p %llx.%llx cap %p %s -> %s, "
4879
"wanted %s -> %s\n", inode,
4880
ceph_vinop(inode), cap,
4881
ceph_cap_string(cap->issued),
4882
ceph_cap_string(cap->issued & ~drop),
4883
ceph_cap_string(cap->mds_wanted),
4884
ceph_cap_string(wanted));
4885
4886
cap->issued &= ~drop;
4887
cap->implemented &= ~drop;
4888
cap->mds_wanted = wanted;
4889
if (cap == ci->i_auth_cap &&
4890
!(wanted & CEPH_CAP_ANY_FILE_WR))
4891
ci->i_requested_max_size = 0;
4892
} else {
4893
doutc(cl, "%p %llx.%llx cap %p %s (force)\n",
4894
inode, ceph_vinop(inode), cap,
4895
ceph_cap_string(cap->issued));
4896
}
4897
4898
rel->ino = cpu_to_le64(ceph_ino(inode));
4899
rel->cap_id = cpu_to_le64(cap->cap_id);
4900
rel->seq = cpu_to_le32(cap->seq);
4901
rel->issue_seq = cpu_to_le32(cap->issue_seq);
4902
rel->mseq = cpu_to_le32(cap->mseq);
4903
rel->caps = cpu_to_le32(cap->implemented);
4904
rel->wanted = cpu_to_le32(cap->mds_wanted);
4905
rel->dname_len = 0;
4906
rel->dname_seq = 0;
4907
*p += sizeof(*rel);
4908
ret = 1;
4909
} else {
4910
doutc(cl, "%p %llx.%llx cap %p %s (noop)\n",
4911
inode, ceph_vinop(inode), cap,
4912
ceph_cap_string(cap->issued));
4913
}
4914
}
4915
spin_unlock(&ci->i_ceph_lock);
4916
return ret;
4917
}
4918
4919
/**
4920
* ceph_encode_dentry_release - encode a dentry release into an outgoing request
4921
* @p: outgoing request buffer
4922
* @dentry: dentry to release
4923
* @dir: dir to release it from
4924
* @mds: mds that we're speaking to
4925
* @drop: caps being dropped
4926
* @unless: unless we have these caps
4927
*
4928
* Encode a dentry release into an outgoing request buffer. Returns 1 if the
4929
* thing was released, or a negative error code otherwise.
4930
*/
4931
int ceph_encode_dentry_release(void **p, struct dentry *dentry,
4932
struct inode *dir,
4933
int mds, int drop, int unless)
4934
{
4935
struct ceph_mds_request_release *rel = *p;
4936
struct ceph_dentry_info *di = ceph_dentry(dentry);
4937
struct ceph_client *cl;
4938
int force = 0;
4939
int ret;
4940
4941
/* This shouldn't happen */
4942
BUG_ON(!dir);
4943
4944
/*
4945
* force an record for the directory caps if we have a dentry lease.
4946
* this is racy (can't take i_ceph_lock and d_lock together), but it
4947
* doesn't have to be perfect; the mds will revoke anything we don't
4948
* release.
4949
*/
4950
spin_lock(&dentry->d_lock);
4951
if (di->lease_session && di->lease_session->s_mds == mds)
4952
force = 1;
4953
spin_unlock(&dentry->d_lock);
4954
4955
ret = ceph_encode_inode_release(p, dir, mds, drop, unless, force);
4956
4957
cl = ceph_inode_to_client(dir);
4958
spin_lock(&dentry->d_lock);
4959
if (ret && di->lease_session && di->lease_session->s_mds == mds) {
4960
int len = dentry->d_name.len;
4961
doutc(cl, "%p mds%d seq %d\n", dentry, mds,
4962
(int)di->lease_seq);
4963
rel->dname_seq = cpu_to_le32(di->lease_seq);
4964
__ceph_mdsc_drop_dentry_lease(dentry);
4965
memcpy(*p, dentry->d_name.name, len);
4966
spin_unlock(&dentry->d_lock);
4967
if (IS_ENCRYPTED(dir) && fscrypt_has_encryption_key(dir)) {
4968
len = ceph_encode_encrypted_dname(dir, *p, len);
4969
if (len < 0)
4970
return len;
4971
}
4972
rel->dname_len = cpu_to_le32(len);
4973
*p += len;
4974
} else {
4975
spin_unlock(&dentry->d_lock);
4976
}
4977
return ret;
4978
}
4979
4980
static int remove_capsnaps(struct ceph_mds_client *mdsc, struct inode *inode)
4981
{
4982
struct ceph_inode_info *ci = ceph_inode(inode);
4983
struct ceph_client *cl = mdsc->fsc->client;
4984
struct ceph_cap_snap *capsnap;
4985
int capsnap_release = 0;
4986
4987
lockdep_assert_held(&ci->i_ceph_lock);
4988
4989
doutc(cl, "removing capsnaps, ci is %p, %p %llx.%llx\n",
4990
ci, inode, ceph_vinop(inode));
4991
4992
while (!list_empty(&ci->i_cap_snaps)) {
4993
capsnap = list_first_entry(&ci->i_cap_snaps,
4994
struct ceph_cap_snap, ci_item);
4995
__ceph_remove_capsnap(inode, capsnap, NULL, NULL);
4996
ceph_put_snap_context(capsnap->context);
4997
ceph_put_cap_snap(capsnap);
4998
capsnap_release++;
4999
}
5000
wake_up_all(&ci->i_cap_wq);
5001
wake_up_all(&mdsc->cap_flushing_wq);
5002
return capsnap_release;
5003
}
5004
5005
int ceph_purge_inode_cap(struct inode *inode, struct ceph_cap *cap, bool *invalidate)
5006
{
5007
struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
5008
struct ceph_mds_client *mdsc = fsc->mdsc;
5009
struct ceph_client *cl = fsc->client;
5010
struct ceph_inode_info *ci = ceph_inode(inode);
5011
bool is_auth;
5012
bool dirty_dropped = false;
5013
int iputs = 0;
5014
5015
lockdep_assert_held(&ci->i_ceph_lock);
5016
5017
doutc(cl, "removing cap %p, ci is %p, %p %llx.%llx\n",
5018
cap, ci, inode, ceph_vinop(inode));
5019
5020
is_auth = (cap == ci->i_auth_cap);
5021
__ceph_remove_cap(cap, false);
5022
if (is_auth) {
5023
struct ceph_cap_flush *cf;
5024
5025
if (ceph_inode_is_shutdown(inode)) {
5026
if (inode->i_data.nrpages > 0)
5027
*invalidate = true;
5028
if (ci->i_wrbuffer_ref > 0)
5029
mapping_set_error(&inode->i_data, -EIO);
5030
}
5031
5032
spin_lock(&mdsc->cap_dirty_lock);
5033
5034
/* trash all of the cap flushes for this inode */
5035
while (!list_empty(&ci->i_cap_flush_list)) {
5036
cf = list_first_entry(&ci->i_cap_flush_list,
5037
struct ceph_cap_flush, i_list);
5038
list_del_init(&cf->g_list);
5039
list_del_init(&cf->i_list);
5040
if (!cf->is_capsnap)
5041
ceph_free_cap_flush(cf);
5042
}
5043
5044
if (!list_empty(&ci->i_dirty_item)) {
5045
pr_warn_ratelimited_client(cl,
5046
" dropping dirty %s state for %p %llx.%llx\n",
5047
ceph_cap_string(ci->i_dirty_caps),
5048
inode, ceph_vinop(inode));
5049
ci->i_dirty_caps = 0;
5050
list_del_init(&ci->i_dirty_item);
5051
dirty_dropped = true;
5052
}
5053
if (!list_empty(&ci->i_flushing_item)) {
5054
pr_warn_ratelimited_client(cl,
5055
" dropping dirty+flushing %s state for %p %llx.%llx\n",
5056
ceph_cap_string(ci->i_flushing_caps),
5057
inode, ceph_vinop(inode));
5058
ci->i_flushing_caps = 0;
5059
list_del_init(&ci->i_flushing_item);
5060
mdsc->num_cap_flushing--;
5061
dirty_dropped = true;
5062
}
5063
spin_unlock(&mdsc->cap_dirty_lock);
5064
5065
if (dirty_dropped) {
5066
mapping_set_error(inode->i_mapping, -EIO);
5067
5068
if (ci->i_wrbuffer_ref_head == 0 &&
5069
ci->i_wr_ref == 0 &&
5070
ci->i_dirty_caps == 0 &&
5071
ci->i_flushing_caps == 0) {
5072
ceph_put_snap_context(ci->i_head_snapc);
5073
ci->i_head_snapc = NULL;
5074
}
5075
}
5076
5077
if (atomic_read(&ci->i_filelock_ref) > 0) {
5078
/* make further file lock syscall return -EIO */
5079
ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
5080
pr_warn_ratelimited_client(cl,
5081
" dropping file locks for %p %llx.%llx\n",
5082
inode, ceph_vinop(inode));
5083
}
5084
5085
if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
5086
cf = ci->i_prealloc_cap_flush;
5087
ci->i_prealloc_cap_flush = NULL;
5088
if (!cf->is_capsnap)
5089
ceph_free_cap_flush(cf);
5090
}
5091
5092
if (!list_empty(&ci->i_cap_snaps))
5093
iputs = remove_capsnaps(mdsc, inode);
5094
}
5095
if (dirty_dropped)
5096
++iputs;
5097
return iputs;
5098
}
5099
5100