Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
awilliam
GitHub Repository: awilliam/linux-vfio
Path: blob/master/fs/ceph/caps.c
15111 views
1
#include <linux/ceph/ceph_debug.h>
2
3
#include <linux/fs.h>
4
#include <linux/kernel.h>
5
#include <linux/sched.h>
6
#include <linux/slab.h>
7
#include <linux/vmalloc.h>
8
#include <linux/wait.h>
9
#include <linux/writeback.h>
10
11
#include "super.h"
12
#include "mds_client.h"
13
#include <linux/ceph/decode.h>
14
#include <linux/ceph/messenger.h>
15
16
/*
17
* Capability management
18
*
19
* The Ceph metadata servers control client access to inode metadata
20
* and file data by issuing capabilities, granting clients permission
21
* to read and/or write both inode field and file data to OSDs
22
* (storage nodes). Each capability consists of a set of bits
23
* indicating which operations are allowed.
24
*
25
* If the client holds a *_SHARED cap, the client has a coherent value
26
* that can be safely read from the cached inode.
27
*
28
* In the case of a *_EXCL (exclusive) or FILE_WR capabilities, the
29
* client is allowed to change inode attributes (e.g., file size,
30
* mtime), note its dirty state in the ceph_cap, and asynchronously
31
* flush that metadata change to the MDS.
32
*
33
* In the event of a conflicting operation (perhaps by another
34
* client), the MDS will revoke the conflicting client capabilities.
35
*
36
* In order for a client to cache an inode, it must hold a capability
37
* with at least one MDS server. When inodes are released, release
38
* notifications are batched and periodically sent en masse to the MDS
39
* cluster to release server state.
40
*/
41
42
43
/*
44
* Generate readable cap strings for debugging output.
45
*/
46
#define MAX_CAP_STR 20
47
static char cap_str[MAX_CAP_STR][40];
48
static DEFINE_SPINLOCK(cap_str_lock);
49
static int last_cap_str;
50
51
static char *gcap_string(char *s, int c)
52
{
53
if (c & CEPH_CAP_GSHARED)
54
*s++ = 's';
55
if (c & CEPH_CAP_GEXCL)
56
*s++ = 'x';
57
if (c & CEPH_CAP_GCACHE)
58
*s++ = 'c';
59
if (c & CEPH_CAP_GRD)
60
*s++ = 'r';
61
if (c & CEPH_CAP_GWR)
62
*s++ = 'w';
63
if (c & CEPH_CAP_GBUFFER)
64
*s++ = 'b';
65
if (c & CEPH_CAP_GLAZYIO)
66
*s++ = 'l';
67
return s;
68
}
69
70
const char *ceph_cap_string(int caps)
71
{
72
int i;
73
char *s;
74
int c;
75
76
spin_lock(&cap_str_lock);
77
i = last_cap_str++;
78
if (last_cap_str == MAX_CAP_STR)
79
last_cap_str = 0;
80
spin_unlock(&cap_str_lock);
81
82
s = cap_str[i];
83
84
if (caps & CEPH_CAP_PIN)
85
*s++ = 'p';
86
87
c = (caps >> CEPH_CAP_SAUTH) & 3;
88
if (c) {
89
*s++ = 'A';
90
s = gcap_string(s, c);
91
}
92
93
c = (caps >> CEPH_CAP_SLINK) & 3;
94
if (c) {
95
*s++ = 'L';
96
s = gcap_string(s, c);
97
}
98
99
c = (caps >> CEPH_CAP_SXATTR) & 3;
100
if (c) {
101
*s++ = 'X';
102
s = gcap_string(s, c);
103
}
104
105
c = caps >> CEPH_CAP_SFILE;
106
if (c) {
107
*s++ = 'F';
108
s = gcap_string(s, c);
109
}
110
111
if (s == cap_str[i])
112
*s++ = '-';
113
*s = 0;
114
return cap_str[i];
115
}
116
117
void ceph_caps_init(struct ceph_mds_client *mdsc)
118
{
119
INIT_LIST_HEAD(&mdsc->caps_list);
120
spin_lock_init(&mdsc->caps_list_lock);
121
}
122
123
void ceph_caps_finalize(struct ceph_mds_client *mdsc)
124
{
125
struct ceph_cap *cap;
126
127
spin_lock(&mdsc->caps_list_lock);
128
while (!list_empty(&mdsc->caps_list)) {
129
cap = list_first_entry(&mdsc->caps_list,
130
struct ceph_cap, caps_item);
131
list_del(&cap->caps_item);
132
kmem_cache_free(ceph_cap_cachep, cap);
133
}
134
mdsc->caps_total_count = 0;
135
mdsc->caps_avail_count = 0;
136
mdsc->caps_use_count = 0;
137
mdsc->caps_reserve_count = 0;
138
mdsc->caps_min_count = 0;
139
spin_unlock(&mdsc->caps_list_lock);
140
}
141
142
void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
143
{
144
spin_lock(&mdsc->caps_list_lock);
145
mdsc->caps_min_count += delta;
146
BUG_ON(mdsc->caps_min_count < 0);
147
spin_unlock(&mdsc->caps_list_lock);
148
}
149
150
int ceph_reserve_caps(struct ceph_mds_client *mdsc,
151
struct ceph_cap_reservation *ctx, int need)
152
{
153
int i;
154
struct ceph_cap *cap;
155
int have;
156
int alloc = 0;
157
LIST_HEAD(newcaps);
158
int ret = 0;
159
160
dout("reserve caps ctx=%p need=%d\n", ctx, need);
161
162
/* first reserve any caps that are already allocated */
163
spin_lock(&mdsc->caps_list_lock);
164
if (mdsc->caps_avail_count >= need)
165
have = need;
166
else
167
have = mdsc->caps_avail_count;
168
mdsc->caps_avail_count -= have;
169
mdsc->caps_reserve_count += have;
170
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
171
mdsc->caps_reserve_count +
172
mdsc->caps_avail_count);
173
spin_unlock(&mdsc->caps_list_lock);
174
175
for (i = have; i < need; i++) {
176
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
177
if (!cap) {
178
ret = -ENOMEM;
179
goto out_alloc_count;
180
}
181
list_add(&cap->caps_item, &newcaps);
182
alloc++;
183
}
184
BUG_ON(have + alloc != need);
185
186
spin_lock(&mdsc->caps_list_lock);
187
mdsc->caps_total_count += alloc;
188
mdsc->caps_reserve_count += alloc;
189
list_splice(&newcaps, &mdsc->caps_list);
190
191
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
192
mdsc->caps_reserve_count +
193
mdsc->caps_avail_count);
194
spin_unlock(&mdsc->caps_list_lock);
195
196
ctx->count = need;
197
dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
198
ctx, mdsc->caps_total_count, mdsc->caps_use_count,
199
mdsc->caps_reserve_count, mdsc->caps_avail_count);
200
return 0;
201
202
out_alloc_count:
203
/* we didn't manage to reserve as much as we needed */
204
pr_warning("reserve caps ctx=%p ENOMEM need=%d got=%d\n",
205
ctx, need, have);
206
return ret;
207
}
208
209
int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
210
struct ceph_cap_reservation *ctx)
211
{
212
dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
213
if (ctx->count) {
214
spin_lock(&mdsc->caps_list_lock);
215
BUG_ON(mdsc->caps_reserve_count < ctx->count);
216
mdsc->caps_reserve_count -= ctx->count;
217
mdsc->caps_avail_count += ctx->count;
218
ctx->count = 0;
219
dout("unreserve caps %d = %d used + %d resv + %d avail\n",
220
mdsc->caps_total_count, mdsc->caps_use_count,
221
mdsc->caps_reserve_count, mdsc->caps_avail_count);
222
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
223
mdsc->caps_reserve_count +
224
mdsc->caps_avail_count);
225
spin_unlock(&mdsc->caps_list_lock);
226
}
227
return 0;
228
}
229
230
static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
231
struct ceph_cap_reservation *ctx)
232
{
233
struct ceph_cap *cap = NULL;
234
235
/* temporary, until we do something about cap import/export */
236
if (!ctx) {
237
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
238
if (cap) {
239
mdsc->caps_use_count++;
240
mdsc->caps_total_count++;
241
}
242
return cap;
243
}
244
245
spin_lock(&mdsc->caps_list_lock);
246
dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
247
ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
248
mdsc->caps_reserve_count, mdsc->caps_avail_count);
249
BUG_ON(!ctx->count);
250
BUG_ON(ctx->count > mdsc->caps_reserve_count);
251
BUG_ON(list_empty(&mdsc->caps_list));
252
253
ctx->count--;
254
mdsc->caps_reserve_count--;
255
mdsc->caps_use_count++;
256
257
cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
258
list_del(&cap->caps_item);
259
260
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
261
mdsc->caps_reserve_count + mdsc->caps_avail_count);
262
spin_unlock(&mdsc->caps_list_lock);
263
return cap;
264
}
265
266
void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
267
{
268
spin_lock(&mdsc->caps_list_lock);
269
dout("put_cap %p %d = %d used + %d resv + %d avail\n",
270
cap, mdsc->caps_total_count, mdsc->caps_use_count,
271
mdsc->caps_reserve_count, mdsc->caps_avail_count);
272
mdsc->caps_use_count--;
273
/*
274
* Keep some preallocated caps around (ceph_min_count), to
275
* avoid lots of free/alloc churn.
276
*/
277
if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
278
mdsc->caps_min_count) {
279
mdsc->caps_total_count--;
280
kmem_cache_free(ceph_cap_cachep, cap);
281
} else {
282
mdsc->caps_avail_count++;
283
list_add(&cap->caps_item, &mdsc->caps_list);
284
}
285
286
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
287
mdsc->caps_reserve_count + mdsc->caps_avail_count);
288
spin_unlock(&mdsc->caps_list_lock);
289
}
290
291
void ceph_reservation_status(struct ceph_fs_client *fsc,
292
int *total, int *avail, int *used, int *reserved,
293
int *min)
294
{
295
struct ceph_mds_client *mdsc = fsc->mdsc;
296
297
if (total)
298
*total = mdsc->caps_total_count;
299
if (avail)
300
*avail = mdsc->caps_avail_count;
301
if (used)
302
*used = mdsc->caps_use_count;
303
if (reserved)
304
*reserved = mdsc->caps_reserve_count;
305
if (min)
306
*min = mdsc->caps_min_count;
307
}
308
309
/*
310
* Find ceph_cap for given mds, if any.
311
*
312
* Called with i_lock held.
313
*/
314
static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
315
{
316
struct ceph_cap *cap;
317
struct rb_node *n = ci->i_caps.rb_node;
318
319
while (n) {
320
cap = rb_entry(n, struct ceph_cap, ci_node);
321
if (mds < cap->mds)
322
n = n->rb_left;
323
else if (mds > cap->mds)
324
n = n->rb_right;
325
else
326
return cap;
327
}
328
return NULL;
329
}
330
331
struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
332
{
333
struct ceph_cap *cap;
334
335
spin_lock(&ci->vfs_inode.i_lock);
336
cap = __get_cap_for_mds(ci, mds);
337
spin_unlock(&ci->vfs_inode.i_lock);
338
return cap;
339
}
340
341
/*
342
* Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1.
343
*/
344
static int __ceph_get_cap_mds(struct ceph_inode_info *ci)
345
{
346
struct ceph_cap *cap;
347
int mds = -1;
348
struct rb_node *p;
349
350
/* prefer mds with WR|BUFFER|EXCL caps */
351
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
352
cap = rb_entry(p, struct ceph_cap, ci_node);
353
mds = cap->mds;
354
if (cap->issued & (CEPH_CAP_FILE_WR |
355
CEPH_CAP_FILE_BUFFER |
356
CEPH_CAP_FILE_EXCL))
357
break;
358
}
359
return mds;
360
}
361
362
int ceph_get_cap_mds(struct inode *inode)
363
{
364
int mds;
365
spin_lock(&inode->i_lock);
366
mds = __ceph_get_cap_mds(ceph_inode(inode));
367
spin_unlock(&inode->i_lock);
368
return mds;
369
}
370
371
/*
372
* Called under i_lock.
373
*/
374
static void __insert_cap_node(struct ceph_inode_info *ci,
375
struct ceph_cap *new)
376
{
377
struct rb_node **p = &ci->i_caps.rb_node;
378
struct rb_node *parent = NULL;
379
struct ceph_cap *cap = NULL;
380
381
while (*p) {
382
parent = *p;
383
cap = rb_entry(parent, struct ceph_cap, ci_node);
384
if (new->mds < cap->mds)
385
p = &(*p)->rb_left;
386
else if (new->mds > cap->mds)
387
p = &(*p)->rb_right;
388
else
389
BUG();
390
}
391
392
rb_link_node(&new->ci_node, parent, p);
393
rb_insert_color(&new->ci_node, &ci->i_caps);
394
}
395
396
/*
397
* (re)set cap hold timeouts, which control the delayed release
398
* of unused caps back to the MDS. Should be called on cap use.
399
*/
400
static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
401
struct ceph_inode_info *ci)
402
{
403
struct ceph_mount_options *ma = mdsc->fsc->mount_options;
404
405
ci->i_hold_caps_min = round_jiffies(jiffies +
406
ma->caps_wanted_delay_min * HZ);
407
ci->i_hold_caps_max = round_jiffies(jiffies +
408
ma->caps_wanted_delay_max * HZ);
409
dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode,
410
ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies);
411
}
412
413
/*
414
* (Re)queue cap at the end of the delayed cap release list.
415
*
416
* If I_FLUSH is set, leave the inode at the front of the list.
417
*
418
* Caller holds i_lock
419
* -> we take mdsc->cap_delay_lock
420
*/
421
static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
422
struct ceph_inode_info *ci)
423
{
424
__cap_set_timeouts(mdsc, ci);
425
dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode,
426
ci->i_ceph_flags, ci->i_hold_caps_max);
427
if (!mdsc->stopping) {
428
spin_lock(&mdsc->cap_delay_lock);
429
if (!list_empty(&ci->i_cap_delay_list)) {
430
if (ci->i_ceph_flags & CEPH_I_FLUSH)
431
goto no_change;
432
list_del_init(&ci->i_cap_delay_list);
433
}
434
list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
435
no_change:
436
spin_unlock(&mdsc->cap_delay_lock);
437
}
438
}
439
440
/*
441
* Queue an inode for immediate writeback. Mark inode with I_FLUSH,
442
* indicating we should send a cap message to flush dirty metadata
443
* asap, and move to the front of the delayed cap list.
444
*/
445
static void __cap_delay_requeue_front(struct ceph_mds_client *mdsc,
446
struct ceph_inode_info *ci)
447
{
448
dout("__cap_delay_requeue_front %p\n", &ci->vfs_inode);
449
spin_lock(&mdsc->cap_delay_lock);
450
ci->i_ceph_flags |= CEPH_I_FLUSH;
451
if (!list_empty(&ci->i_cap_delay_list))
452
list_del_init(&ci->i_cap_delay_list);
453
list_add(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
454
spin_unlock(&mdsc->cap_delay_lock);
455
}
456
457
/*
458
* Cancel delayed work on cap.
459
*
460
* Caller must hold i_lock.
461
*/
462
static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
463
struct ceph_inode_info *ci)
464
{
465
dout("__cap_delay_cancel %p\n", &ci->vfs_inode);
466
if (list_empty(&ci->i_cap_delay_list))
467
return;
468
spin_lock(&mdsc->cap_delay_lock);
469
list_del_init(&ci->i_cap_delay_list);
470
spin_unlock(&mdsc->cap_delay_lock);
471
}
472
473
/*
474
* Common issue checks for add_cap, handle_cap_grant.
475
*/
476
static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
477
unsigned issued)
478
{
479
unsigned had = __ceph_caps_issued(ci, NULL);
480
481
/*
482
* Each time we receive FILE_CACHE anew, we increment
483
* i_rdcache_gen.
484
*/
485
if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
486
(had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
487
ci->i_rdcache_gen++;
488
489
/*
490
* if we are newly issued FILE_SHARED, clear I_COMPLETE; we
491
* don't know what happened to this directory while we didn't
492
* have the cap.
493
*/
494
if ((issued & CEPH_CAP_FILE_SHARED) &&
495
(had & CEPH_CAP_FILE_SHARED) == 0) {
496
ci->i_shared_gen++;
497
if (S_ISDIR(ci->vfs_inode.i_mode)) {
498
dout(" marking %p NOT complete\n", &ci->vfs_inode);
499
ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
500
}
501
}
502
}
503
504
/*
505
* Add a capability under the given MDS session.
506
*
507
* Caller should hold session snap_rwsem (read) and s_mutex.
508
*
509
* @fmode is the open file mode, if we are opening a file, otherwise
510
* it is < 0. (This is so we can atomically add the cap and add an
511
* open file reference to it.)
512
*/
513
int ceph_add_cap(struct inode *inode,
514
struct ceph_mds_session *session, u64 cap_id,
515
int fmode, unsigned issued, unsigned wanted,
516
unsigned seq, unsigned mseq, u64 realmino, int flags,
517
struct ceph_cap_reservation *caps_reservation)
518
{
519
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
520
struct ceph_inode_info *ci = ceph_inode(inode);
521
struct ceph_cap *new_cap = NULL;
522
struct ceph_cap *cap;
523
int mds = session->s_mds;
524
int actual_wanted;
525
526
dout("add_cap %p mds%d cap %llx %s seq %d\n", inode,
527
session->s_mds, cap_id, ceph_cap_string(issued), seq);
528
529
/*
530
* If we are opening the file, include file mode wanted bits
531
* in wanted.
532
*/
533
if (fmode >= 0)
534
wanted |= ceph_caps_for_mode(fmode);
535
536
retry:
537
spin_lock(&inode->i_lock);
538
cap = __get_cap_for_mds(ci, mds);
539
if (!cap) {
540
if (new_cap) {
541
cap = new_cap;
542
new_cap = NULL;
543
} else {
544
spin_unlock(&inode->i_lock);
545
new_cap = get_cap(mdsc, caps_reservation);
546
if (new_cap == NULL)
547
return -ENOMEM;
548
goto retry;
549
}
550
551
cap->issued = 0;
552
cap->implemented = 0;
553
cap->mds = mds;
554
cap->mds_wanted = 0;
555
556
cap->ci = ci;
557
__insert_cap_node(ci, cap);
558
559
/* clear out old exporting info? (i.e. on cap import) */
560
if (ci->i_cap_exporting_mds == mds) {
561
ci->i_cap_exporting_issued = 0;
562
ci->i_cap_exporting_mseq = 0;
563
ci->i_cap_exporting_mds = -1;
564
}
565
566
/* add to session cap list */
567
cap->session = session;
568
spin_lock(&session->s_cap_lock);
569
list_add_tail(&cap->session_caps, &session->s_caps);
570
session->s_nr_caps++;
571
spin_unlock(&session->s_cap_lock);
572
} else if (new_cap)
573
ceph_put_cap(mdsc, new_cap);
574
575
if (!ci->i_snap_realm) {
576
/*
577
* add this inode to the appropriate snap realm
578
*/
579
struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
580
realmino);
581
if (realm) {
582
ceph_get_snap_realm(mdsc, realm);
583
spin_lock(&realm->inodes_with_caps_lock);
584
ci->i_snap_realm = realm;
585
list_add(&ci->i_snap_realm_item,
586
&realm->inodes_with_caps);
587
spin_unlock(&realm->inodes_with_caps_lock);
588
} else {
589
pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
590
realmino);
591
WARN_ON(!realm);
592
}
593
}
594
595
__check_cap_issue(ci, cap, issued);
596
597
/*
598
* If we are issued caps we don't want, or the mds' wanted
599
* value appears to be off, queue a check so we'll release
600
* later and/or update the mds wanted value.
601
*/
602
actual_wanted = __ceph_caps_wanted(ci);
603
if ((wanted & ~actual_wanted) ||
604
(issued & ~actual_wanted & CEPH_CAP_ANY_WR)) {
605
dout(" issued %s, mds wanted %s, actual %s, queueing\n",
606
ceph_cap_string(issued), ceph_cap_string(wanted),
607
ceph_cap_string(actual_wanted));
608
__cap_delay_requeue(mdsc, ci);
609
}
610
611
if (flags & CEPH_CAP_FLAG_AUTH)
612
ci->i_auth_cap = cap;
613
else if (ci->i_auth_cap == cap)
614
ci->i_auth_cap = NULL;
615
616
dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n",
617
inode, ceph_vinop(inode), cap, ceph_cap_string(issued),
618
ceph_cap_string(issued|cap->issued), seq, mds);
619
cap->cap_id = cap_id;
620
cap->issued = issued;
621
cap->implemented |= issued;
622
cap->mds_wanted |= wanted;
623
cap->seq = seq;
624
cap->issue_seq = seq;
625
cap->mseq = mseq;
626
cap->cap_gen = session->s_cap_gen;
627
628
if (fmode >= 0)
629
__ceph_get_fmode(ci, fmode);
630
spin_unlock(&inode->i_lock);
631
wake_up_all(&ci->i_cap_wq);
632
return 0;
633
}
634
635
/*
636
* Return true if cap has not timed out and belongs to the current
637
* generation of the MDS session (i.e. has not gone 'stale' due to
638
* us losing touch with the mds).
639
*/
640
static int __cap_is_valid(struct ceph_cap *cap)
641
{
642
unsigned long ttl;
643
u32 gen;
644
645
spin_lock(&cap->session->s_cap_lock);
646
gen = cap->session->s_cap_gen;
647
ttl = cap->session->s_cap_ttl;
648
spin_unlock(&cap->session->s_cap_lock);
649
650
if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) {
651
dout("__cap_is_valid %p cap %p issued %s "
652
"but STALE (gen %u vs %u)\n", &cap->ci->vfs_inode,
653
cap, ceph_cap_string(cap->issued), cap->cap_gen, gen);
654
return 0;
655
}
656
657
return 1;
658
}
659
660
/*
661
* Return set of valid cap bits issued to us. Note that caps time
662
* out, and may be invalidated in bulk if the client session times out
663
* and session->s_cap_gen is bumped.
664
*/
665
int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
666
{
667
int have = ci->i_snap_caps | ci->i_cap_exporting_issued;
668
struct ceph_cap *cap;
669
struct rb_node *p;
670
671
if (implemented)
672
*implemented = 0;
673
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
674
cap = rb_entry(p, struct ceph_cap, ci_node);
675
if (!__cap_is_valid(cap))
676
continue;
677
dout("__ceph_caps_issued %p cap %p issued %s\n",
678
&ci->vfs_inode, cap, ceph_cap_string(cap->issued));
679
have |= cap->issued;
680
if (implemented)
681
*implemented |= cap->implemented;
682
}
683
return have;
684
}
685
686
/*
687
* Get cap bits issued by caps other than @ocap
688
*/
689
int __ceph_caps_issued_other(struct ceph_inode_info *ci, struct ceph_cap *ocap)
690
{
691
int have = ci->i_snap_caps;
692
struct ceph_cap *cap;
693
struct rb_node *p;
694
695
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
696
cap = rb_entry(p, struct ceph_cap, ci_node);
697
if (cap == ocap)
698
continue;
699
if (!__cap_is_valid(cap))
700
continue;
701
have |= cap->issued;
702
}
703
return have;
704
}
705
706
/*
707
* Move a cap to the end of the LRU (oldest caps at list head, newest
708
* at list tail).
709
*/
710
static void __touch_cap(struct ceph_cap *cap)
711
{
712
struct ceph_mds_session *s = cap->session;
713
714
spin_lock(&s->s_cap_lock);
715
if (s->s_cap_iterator == NULL) {
716
dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap,
717
s->s_mds);
718
list_move_tail(&cap->session_caps, &s->s_caps);
719
} else {
720
dout("__touch_cap %p cap %p mds%d NOP, iterating over caps\n",
721
&cap->ci->vfs_inode, cap, s->s_mds);
722
}
723
spin_unlock(&s->s_cap_lock);
724
}
725
726
/*
727
* Check if we hold the given mask. If so, move the cap(s) to the
728
* front of their respective LRUs. (This is the preferred way for
729
* callers to check for caps they want.)
730
*/
731
int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
732
{
733
struct ceph_cap *cap;
734
struct rb_node *p;
735
int have = ci->i_snap_caps;
736
737
if ((have & mask) == mask) {
738
dout("__ceph_caps_issued_mask %p snap issued %s"
739
" (mask %s)\n", &ci->vfs_inode,
740
ceph_cap_string(have),
741
ceph_cap_string(mask));
742
return 1;
743
}
744
745
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
746
cap = rb_entry(p, struct ceph_cap, ci_node);
747
if (!__cap_is_valid(cap))
748
continue;
749
if ((cap->issued & mask) == mask) {
750
dout("__ceph_caps_issued_mask %p cap %p issued %s"
751
" (mask %s)\n", &ci->vfs_inode, cap,
752
ceph_cap_string(cap->issued),
753
ceph_cap_string(mask));
754
if (touch)
755
__touch_cap(cap);
756
return 1;
757
}
758
759
/* does a combination of caps satisfy mask? */
760
have |= cap->issued;
761
if ((have & mask) == mask) {
762
dout("__ceph_caps_issued_mask %p combo issued %s"
763
" (mask %s)\n", &ci->vfs_inode,
764
ceph_cap_string(cap->issued),
765
ceph_cap_string(mask));
766
if (touch) {
767
struct rb_node *q;
768
769
/* touch this + preceding caps */
770
__touch_cap(cap);
771
for (q = rb_first(&ci->i_caps); q != p;
772
q = rb_next(q)) {
773
cap = rb_entry(q, struct ceph_cap,
774
ci_node);
775
if (!__cap_is_valid(cap))
776
continue;
777
__touch_cap(cap);
778
}
779
}
780
return 1;
781
}
782
}
783
784
return 0;
785
}
786
787
/*
788
* Return true if mask caps are currently being revoked by an MDS.
789
*/
790
int ceph_caps_revoking(struct ceph_inode_info *ci, int mask)
791
{
792
struct inode *inode = &ci->vfs_inode;
793
struct ceph_cap *cap;
794
struct rb_node *p;
795
int ret = 0;
796
797
spin_lock(&inode->i_lock);
798
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
799
cap = rb_entry(p, struct ceph_cap, ci_node);
800
if (__cap_is_valid(cap) &&
801
(cap->implemented & ~cap->issued & mask)) {
802
ret = 1;
803
break;
804
}
805
}
806
spin_unlock(&inode->i_lock);
807
dout("ceph_caps_revoking %p %s = %d\n", inode,
808
ceph_cap_string(mask), ret);
809
return ret;
810
}
811
812
int __ceph_caps_used(struct ceph_inode_info *ci)
813
{
814
int used = 0;
815
if (ci->i_pin_ref)
816
used |= CEPH_CAP_PIN;
817
if (ci->i_rd_ref)
818
used |= CEPH_CAP_FILE_RD;
819
if (ci->i_rdcache_ref || ci->vfs_inode.i_data.nrpages)
820
used |= CEPH_CAP_FILE_CACHE;
821
if (ci->i_wr_ref)
822
used |= CEPH_CAP_FILE_WR;
823
if (ci->i_wb_ref || ci->i_wrbuffer_ref)
824
used |= CEPH_CAP_FILE_BUFFER;
825
return used;
826
}
827
828
/*
829
* wanted, by virtue of open file modes
830
*/
831
int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
832
{
833
int want = 0;
834
int mode;
835
for (mode = 0; mode < CEPH_FILE_MODE_NUM; mode++)
836
if (ci->i_nr_by_mode[mode])
837
want |= ceph_caps_for_mode(mode);
838
return want;
839
}
840
841
/*
842
* Return caps we have registered with the MDS(s) as 'wanted'.
843
*/
844
int __ceph_caps_mds_wanted(struct ceph_inode_info *ci)
845
{
846
struct ceph_cap *cap;
847
struct rb_node *p;
848
int mds_wanted = 0;
849
850
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
851
cap = rb_entry(p, struct ceph_cap, ci_node);
852
if (!__cap_is_valid(cap))
853
continue;
854
mds_wanted |= cap->mds_wanted;
855
}
856
return mds_wanted;
857
}
858
859
/*
860
* called under i_lock
861
*/
862
static int __ceph_is_any_caps(struct ceph_inode_info *ci)
863
{
864
return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_mds >= 0;
865
}
866
867
/*
868
* Remove a cap. Take steps to deal with a racing iterate_session_caps.
869
*
870
* caller should hold i_lock.
871
* caller will not hold session s_mutex if called from destroy_inode.
872
*/
873
void __ceph_remove_cap(struct ceph_cap *cap)
874
{
875
struct ceph_mds_session *session = cap->session;
876
struct ceph_inode_info *ci = cap->ci;
877
struct ceph_mds_client *mdsc =
878
ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
879
int removed = 0;
880
881
dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
882
883
/* remove from session list */
884
spin_lock(&session->s_cap_lock);
885
if (session->s_cap_iterator == cap) {
886
/* not yet, we are iterating over this very cap */
887
dout("__ceph_remove_cap delaying %p removal from session %p\n",
888
cap, cap->session);
889
} else {
890
list_del_init(&cap->session_caps);
891
session->s_nr_caps--;
892
cap->session = NULL;
893
removed = 1;
894
}
895
/* protect backpointer with s_cap_lock: see iterate_session_caps */
896
cap->ci = NULL;
897
spin_unlock(&session->s_cap_lock);
898
899
/* remove from inode list */
900
rb_erase(&cap->ci_node, &ci->i_caps);
901
if (ci->i_auth_cap == cap)
902
ci->i_auth_cap = NULL;
903
904
if (removed)
905
ceph_put_cap(mdsc, cap);
906
907
if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
908
struct ceph_snap_realm *realm = ci->i_snap_realm;
909
spin_lock(&realm->inodes_with_caps_lock);
910
list_del_init(&ci->i_snap_realm_item);
911
ci->i_snap_realm_counter++;
912
ci->i_snap_realm = NULL;
913
spin_unlock(&realm->inodes_with_caps_lock);
914
ceph_put_snap_realm(mdsc, realm);
915
}
916
if (!__ceph_is_any_real_caps(ci))
917
__cap_delay_cancel(mdsc, ci);
918
}
919
920
/*
921
* Build and send a cap message to the given MDS.
922
*
923
* Caller should be holding s_mutex.
924
*/
925
static int send_cap_msg(struct ceph_mds_session *session,
926
u64 ino, u64 cid, int op,
927
int caps, int wanted, int dirty,
928
u32 seq, u64 flush_tid, u32 issue_seq, u32 mseq,
929
u64 size, u64 max_size,
930
struct timespec *mtime, struct timespec *atime,
931
u64 time_warp_seq,
932
uid_t uid, gid_t gid, mode_t mode,
933
u64 xattr_version,
934
struct ceph_buffer *xattrs_buf,
935
u64 follows)
936
{
937
struct ceph_mds_caps *fc;
938
struct ceph_msg *msg;
939
940
dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
941
" seq %u/%u mseq %u follows %lld size %llu/%llu"
942
" xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(op),
943
cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted),
944
ceph_cap_string(dirty),
945
seq, issue_seq, mseq, follows, size, max_size,
946
xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
947
948
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS);
949
if (!msg)
950
return -ENOMEM;
951
952
msg->hdr.tid = cpu_to_le64(flush_tid);
953
954
fc = msg->front.iov_base;
955
memset(fc, 0, sizeof(*fc));
956
957
fc->cap_id = cpu_to_le64(cid);
958
fc->op = cpu_to_le32(op);
959
fc->seq = cpu_to_le32(seq);
960
fc->issue_seq = cpu_to_le32(issue_seq);
961
fc->migrate_seq = cpu_to_le32(mseq);
962
fc->caps = cpu_to_le32(caps);
963
fc->wanted = cpu_to_le32(wanted);
964
fc->dirty = cpu_to_le32(dirty);
965
fc->ino = cpu_to_le64(ino);
966
fc->snap_follows = cpu_to_le64(follows);
967
968
fc->size = cpu_to_le64(size);
969
fc->max_size = cpu_to_le64(max_size);
970
if (mtime)
971
ceph_encode_timespec(&fc->mtime, mtime);
972
if (atime)
973
ceph_encode_timespec(&fc->atime, atime);
974
fc->time_warp_seq = cpu_to_le32(time_warp_seq);
975
976
fc->uid = cpu_to_le32(uid);
977
fc->gid = cpu_to_le32(gid);
978
fc->mode = cpu_to_le32(mode);
979
980
fc->xattr_version = cpu_to_le64(xattr_version);
981
if (xattrs_buf) {
982
msg->middle = ceph_buffer_get(xattrs_buf);
983
fc->xattr_len = cpu_to_le32(xattrs_buf->vec.iov_len);
984
msg->hdr.middle_len = cpu_to_le32(xattrs_buf->vec.iov_len);
985
}
986
987
ceph_con_send(&session->s_con, msg);
988
return 0;
989
}
990
991
static void __queue_cap_release(struct ceph_mds_session *session,
992
u64 ino, u64 cap_id, u32 migrate_seq,
993
u32 issue_seq)
994
{
995
struct ceph_msg *msg;
996
struct ceph_mds_cap_release *head;
997
struct ceph_mds_cap_item *item;
998
999
spin_lock(&session->s_cap_lock);
1000
BUG_ON(!session->s_num_cap_releases);
1001
msg = list_first_entry(&session->s_cap_releases,
1002
struct ceph_msg, list_head);
1003
1004
dout(" adding %llx release to mds%d msg %p (%d left)\n",
1005
ino, session->s_mds, msg, session->s_num_cap_releases);
1006
1007
BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
1008
head = msg->front.iov_base;
1009
head->num = cpu_to_le32(le32_to_cpu(head->num) + 1);
1010
item = msg->front.iov_base + msg->front.iov_len;
1011
item->ino = cpu_to_le64(ino);
1012
item->cap_id = cpu_to_le64(cap_id);
1013
item->migrate_seq = cpu_to_le32(migrate_seq);
1014
item->seq = cpu_to_le32(issue_seq);
1015
1016
session->s_num_cap_releases--;
1017
1018
msg->front.iov_len += sizeof(*item);
1019
if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
1020
dout(" release msg %p full\n", msg);
1021
list_move_tail(&msg->list_head, &session->s_cap_releases_done);
1022
} else {
1023
dout(" release msg %p at %d/%d (%d)\n", msg,
1024
(int)le32_to_cpu(head->num),
1025
(int)CEPH_CAPS_PER_RELEASE,
1026
(int)msg->front.iov_len);
1027
}
1028
spin_unlock(&session->s_cap_lock);
1029
}
1030
1031
/*
1032
* Queue cap releases when an inode is dropped from our cache. Since
1033
* inode is about to be destroyed, there is no need for i_lock.
1034
*/
1035
void ceph_queue_caps_release(struct inode *inode)
1036
{
1037
struct ceph_inode_info *ci = ceph_inode(inode);
1038
struct rb_node *p;
1039
1040
p = rb_first(&ci->i_caps);
1041
while (p) {
1042
struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
1043
struct ceph_mds_session *session = cap->session;
1044
1045
__queue_cap_release(session, ceph_ino(inode), cap->cap_id,
1046
cap->mseq, cap->issue_seq);
1047
p = rb_next(p);
1048
__ceph_remove_cap(cap);
1049
}
1050
}
1051
1052
/*
1053
* Send a cap msg on the given inode. Update our caps state, then
1054
* drop i_lock and send the message.
1055
*
1056
* Make note of max_size reported/requested from mds, revoked caps
1057
* that have now been implemented.
1058
*
1059
* Make half-hearted attempt ot to invalidate page cache if we are
1060
* dropping RDCACHE. Note that this will leave behind locked pages
1061
* that we'll then need to deal with elsewhere.
1062
*
1063
* Return non-zero if delayed release, or we experienced an error
1064
* such that the caller should requeue + retry later.
1065
*
1066
* called with i_lock, then drops it.
1067
* caller should hold snap_rwsem (read), s_mutex.
1068
*/
1069
static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1070
int op, int used, int want, int retain, int flushing,
1071
unsigned *pflush_tid)
1072
__releases(cap->ci->vfs_inode->i_lock)
1073
{
1074
struct ceph_inode_info *ci = cap->ci;
1075
struct inode *inode = &ci->vfs_inode;
1076
u64 cap_id = cap->cap_id;
1077
int held, revoking, dropping, keep;
1078
u64 seq, issue_seq, mseq, time_warp_seq, follows;
1079
u64 size, max_size;
1080
struct timespec mtime, atime;
1081
int wake = 0;
1082
mode_t mode;
1083
uid_t uid;
1084
gid_t gid;
1085
struct ceph_mds_session *session;
1086
u64 xattr_version = 0;
1087
struct ceph_buffer *xattr_blob = NULL;
1088
int delayed = 0;
1089
u64 flush_tid = 0;
1090
int i;
1091
int ret;
1092
1093
held = cap->issued | cap->implemented;
1094
revoking = cap->implemented & ~cap->issued;
1095
retain &= ~revoking;
1096
dropping = cap->issued & ~retain;
1097
1098
dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n",
1099
inode, cap, cap->session,
1100
ceph_cap_string(held), ceph_cap_string(held & retain),
1101
ceph_cap_string(revoking));
1102
BUG_ON((retain & CEPH_CAP_PIN) == 0);
1103
1104
session = cap->session;
1105
1106
/* don't release wanted unless we've waited a bit. */
1107
if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 &&
1108
time_before(jiffies, ci->i_hold_caps_min)) {
1109
dout(" delaying issued %s -> %s, wanted %s -> %s on send\n",
1110
ceph_cap_string(cap->issued),
1111
ceph_cap_string(cap->issued & retain),
1112
ceph_cap_string(cap->mds_wanted),
1113
ceph_cap_string(want));
1114
want |= cap->mds_wanted;
1115
retain |= cap->issued;
1116
delayed = 1;
1117
}
1118
ci->i_ceph_flags &= ~(CEPH_I_NODELAY | CEPH_I_FLUSH);
1119
1120
cap->issued &= retain; /* drop bits we don't want */
1121
if (cap->implemented & ~cap->issued) {
1122
/*
1123
* Wake up any waiters on wanted -> needed transition.
1124
* This is due to the weird transition from buffered
1125
* to sync IO... we need to flush dirty pages _before_
1126
* allowing sync writes to avoid reordering.
1127
*/
1128
wake = 1;
1129
}
1130
cap->implemented &= cap->issued | used;
1131
cap->mds_wanted = want;
1132
1133
if (flushing) {
1134
/*
1135
* assign a tid for flush operations so we can avoid
1136
* flush1 -> dirty1 -> flush2 -> flushack1 -> mark
1137
* clean type races. track latest tid for every bit
1138
* so we can handle flush AxFw, flush Fw, and have the
1139
* first ack clean Ax.
1140
*/
1141
flush_tid = ++ci->i_cap_flush_last_tid;
1142
if (pflush_tid)
1143
*pflush_tid = flush_tid;
1144
dout(" cap_flush_tid %d\n", (int)flush_tid);
1145
for (i = 0; i < CEPH_CAP_BITS; i++)
1146
if (flushing & (1 << i))
1147
ci->i_cap_flush_tid[i] = flush_tid;
1148
1149
follows = ci->i_head_snapc->seq;
1150
} else {
1151
follows = 0;
1152
}
1153
1154
keep = cap->implemented;
1155
seq = cap->seq;
1156
issue_seq = cap->issue_seq;
1157
mseq = cap->mseq;
1158
size = inode->i_size;
1159
ci->i_reported_size = size;
1160
max_size = ci->i_wanted_max_size;
1161
ci->i_requested_max_size = max_size;
1162
mtime = inode->i_mtime;
1163
atime = inode->i_atime;
1164
time_warp_seq = ci->i_time_warp_seq;
1165
uid = inode->i_uid;
1166
gid = inode->i_gid;
1167
mode = inode->i_mode;
1168
1169
if (flushing & CEPH_CAP_XATTR_EXCL) {
1170
__ceph_build_xattrs_blob(ci);
1171
xattr_blob = ci->i_xattrs.blob;
1172
xattr_version = ci->i_xattrs.version;
1173
}
1174
1175
spin_unlock(&inode->i_lock);
1176
1177
ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
1178
op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
1179
size, max_size, &mtime, &atime, time_warp_seq,
1180
uid, gid, mode, xattr_version, xattr_blob,
1181
follows);
1182
if (ret < 0) {
1183
dout("error sending cap msg, must requeue %p\n", inode);
1184
delayed = 1;
1185
}
1186
1187
if (wake)
1188
wake_up_all(&ci->i_cap_wq);
1189
1190
return delayed;
1191
}
1192
1193
/*
1194
* When a snapshot is taken, clients accumulate dirty metadata on
1195
* inodes with capabilities in ceph_cap_snaps to describe the file
1196
* state at the time the snapshot was taken. This must be flushed
1197
* asynchronously back to the MDS once sync writes complete and dirty
1198
* data is written out.
1199
*
1200
* Unless @again is true, skip cap_snaps that were already sent to
1201
* the MDS (i.e., during this session).
1202
*
1203
* Called under i_lock. Takes s_mutex as needed.
1204
*/
1205
void __ceph_flush_snaps(struct ceph_inode_info *ci,
1206
struct ceph_mds_session **psession,
1207
int again)
1208
__releases(ci->vfs_inode->i_lock)
1209
__acquires(ci->vfs_inode->i_lock)
1210
{
1211
struct inode *inode = &ci->vfs_inode;
1212
int mds;
1213
struct ceph_cap_snap *capsnap;
1214
u32 mseq;
1215
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
1216
struct ceph_mds_session *session = NULL; /* if session != NULL, we hold
1217
session->s_mutex */
1218
u64 next_follows = 0; /* keep track of how far we've gotten through the
1219
i_cap_snaps list, and skip these entries next time
1220
around to avoid an infinite loop */
1221
1222
if (psession)
1223
session = *psession;
1224
1225
dout("__flush_snaps %p\n", inode);
1226
retry:
1227
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
1228
/* avoid an infiniute loop after retry */
1229
if (capsnap->follows < next_follows)
1230
continue;
1231
/*
1232
* we need to wait for sync writes to complete and for dirty
1233
* pages to be written out.
1234
*/
1235
if (capsnap->dirty_pages || capsnap->writing)
1236
break;
1237
1238
/*
1239
* if cap writeback already occurred, we should have dropped
1240
* the capsnap in ceph_put_wrbuffer_cap_refs.
1241
*/
1242
BUG_ON(capsnap->dirty == 0);
1243
1244
/* pick mds, take s_mutex */
1245
if (ci->i_auth_cap == NULL) {
1246
dout("no auth cap (migrating?), doing nothing\n");
1247
goto out;
1248
}
1249
1250
/* only flush each capsnap once */
1251
if (!again && !list_empty(&capsnap->flushing_item)) {
1252
dout("already flushed %p, skipping\n", capsnap);
1253
continue;
1254
}
1255
1256
mds = ci->i_auth_cap->session->s_mds;
1257
mseq = ci->i_auth_cap->mseq;
1258
1259
if (session && session->s_mds != mds) {
1260
dout("oops, wrong session %p mutex\n", session);
1261
mutex_unlock(&session->s_mutex);
1262
ceph_put_mds_session(session);
1263
session = NULL;
1264
}
1265
if (!session) {
1266
spin_unlock(&inode->i_lock);
1267
mutex_lock(&mdsc->mutex);
1268
session = __ceph_lookup_mds_session(mdsc, mds);
1269
mutex_unlock(&mdsc->mutex);
1270
if (session) {
1271
dout("inverting session/ino locks on %p\n",
1272
session);
1273
mutex_lock(&session->s_mutex);
1274
}
1275
/*
1276
* if session == NULL, we raced against a cap
1277
* deletion or migration. retry, and we'll
1278
* get a better @mds value next time.
1279
*/
1280
spin_lock(&inode->i_lock);
1281
goto retry;
1282
}
1283
1284
capsnap->flush_tid = ++ci->i_cap_flush_last_tid;
1285
atomic_inc(&capsnap->nref);
1286
if (!list_empty(&capsnap->flushing_item))
1287
list_del_init(&capsnap->flushing_item);
1288
list_add_tail(&capsnap->flushing_item,
1289
&session->s_cap_snaps_flushing);
1290
spin_unlock(&inode->i_lock);
1291
1292
dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n",
1293
inode, capsnap, capsnap->follows, capsnap->flush_tid);
1294
send_cap_msg(session, ceph_vino(inode).ino, 0,
1295
CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
1296
capsnap->dirty, 0, capsnap->flush_tid, 0, mseq,
1297
capsnap->size, 0,
1298
&capsnap->mtime, &capsnap->atime,
1299
capsnap->time_warp_seq,
1300
capsnap->uid, capsnap->gid, capsnap->mode,
1301
capsnap->xattr_version, capsnap->xattr_blob,
1302
capsnap->follows);
1303
1304
next_follows = capsnap->follows + 1;
1305
ceph_put_cap_snap(capsnap);
1306
1307
spin_lock(&inode->i_lock);
1308
goto retry;
1309
}
1310
1311
/* we flushed them all; remove this inode from the queue */
1312
spin_lock(&mdsc->snap_flush_lock);
1313
list_del_init(&ci->i_snap_flush_item);
1314
spin_unlock(&mdsc->snap_flush_lock);
1315
1316
out:
1317
if (psession)
1318
*psession = session;
1319
else if (session) {
1320
mutex_unlock(&session->s_mutex);
1321
ceph_put_mds_session(session);
1322
}
1323
}
1324
1325
static void ceph_flush_snaps(struct ceph_inode_info *ci)
1326
{
1327
struct inode *inode = &ci->vfs_inode;
1328
1329
spin_lock(&inode->i_lock);
1330
__ceph_flush_snaps(ci, NULL, 0);
1331
spin_unlock(&inode->i_lock);
1332
}
1333
1334
/*
1335
* Mark caps dirty. If inode is newly dirty, return the dirty flags.
1336
* Caller is then responsible for calling __mark_inode_dirty with the
1337
* returned flags value.
1338
*/
1339
int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
1340
{
1341
struct ceph_mds_client *mdsc =
1342
ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
1343
struct inode *inode = &ci->vfs_inode;
1344
int was = ci->i_dirty_caps;
1345
int dirty = 0;
1346
1347
dout("__mark_dirty_caps %p %s dirty %s -> %s\n", &ci->vfs_inode,
1348
ceph_cap_string(mask), ceph_cap_string(was),
1349
ceph_cap_string(was | mask));
1350
ci->i_dirty_caps |= mask;
1351
if (was == 0) {
1352
if (!ci->i_head_snapc)
1353
ci->i_head_snapc = ceph_get_snap_context(
1354
ci->i_snap_realm->cached_context);
1355
dout(" inode %p now dirty snapc %p\n", &ci->vfs_inode,
1356
ci->i_head_snapc);
1357
BUG_ON(!list_empty(&ci->i_dirty_item));
1358
spin_lock(&mdsc->cap_dirty_lock);
1359
list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
1360
spin_unlock(&mdsc->cap_dirty_lock);
1361
if (ci->i_flushing_caps == 0) {
1362
ihold(inode);
1363
dirty |= I_DIRTY_SYNC;
1364
}
1365
}
1366
BUG_ON(list_empty(&ci->i_dirty_item));
1367
if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) &&
1368
(mask & CEPH_CAP_FILE_BUFFER))
1369
dirty |= I_DIRTY_DATASYNC;
1370
__cap_delay_requeue(mdsc, ci);
1371
return dirty;
1372
}
1373
1374
/*
1375
* Add dirty inode to the flushing list. Assigned a seq number so we
1376
* can wait for caps to flush without starving.
1377
*
1378
* Called under i_lock.
1379
*/
1380
static int __mark_caps_flushing(struct inode *inode,
1381
struct ceph_mds_session *session)
1382
{
1383
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
1384
struct ceph_inode_info *ci = ceph_inode(inode);
1385
int flushing;
1386
1387
BUG_ON(ci->i_dirty_caps == 0);
1388
BUG_ON(list_empty(&ci->i_dirty_item));
1389
1390
flushing = ci->i_dirty_caps;
1391
dout("__mark_caps_flushing flushing %s, flushing_caps %s -> %s\n",
1392
ceph_cap_string(flushing),
1393
ceph_cap_string(ci->i_flushing_caps),
1394
ceph_cap_string(ci->i_flushing_caps | flushing));
1395
ci->i_flushing_caps |= flushing;
1396
ci->i_dirty_caps = 0;
1397
dout(" inode %p now !dirty\n", inode);
1398
1399
spin_lock(&mdsc->cap_dirty_lock);
1400
list_del_init(&ci->i_dirty_item);
1401
1402
ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
1403
if (list_empty(&ci->i_flushing_item)) {
1404
list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
1405
mdsc->num_cap_flushing++;
1406
dout(" inode %p now flushing seq %lld\n", inode,
1407
ci->i_cap_flush_seq);
1408
} else {
1409
list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing);
1410
dout(" inode %p now flushing (more) seq %lld\n", inode,
1411
ci->i_cap_flush_seq);
1412
}
1413
spin_unlock(&mdsc->cap_dirty_lock);
1414
1415
return flushing;
1416
}
1417
1418
/*
1419
* try to invalidate mapping pages without blocking.
1420
*/
1421
static int try_nonblocking_invalidate(struct inode *inode)
1422
{
1423
struct ceph_inode_info *ci = ceph_inode(inode);
1424
u32 invalidating_gen = ci->i_rdcache_gen;
1425
1426
spin_unlock(&inode->i_lock);
1427
invalidate_mapping_pages(&inode->i_data, 0, -1);
1428
spin_lock(&inode->i_lock);
1429
1430
if (inode->i_data.nrpages == 0 &&
1431
invalidating_gen == ci->i_rdcache_gen) {
1432
/* success. */
1433
dout("try_nonblocking_invalidate %p success\n", inode);
1434
/* save any racing async invalidate some trouble */
1435
ci->i_rdcache_revoking = ci->i_rdcache_gen - 1;
1436
return 0;
1437
}
1438
dout("try_nonblocking_invalidate %p failed\n", inode);
1439
return -1;
1440
}
1441
1442
/*
1443
* Swiss army knife function to examine currently used and wanted
1444
* versus held caps. Release, flush, ack revoked caps to mds as
1445
* appropriate.
1446
*
1447
* CHECK_CAPS_NODELAY - caller is delayed work and we should not delay
1448
* cap release further.
1449
* CHECK_CAPS_AUTHONLY - we should only check the auth cap
1450
* CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without
1451
* further delay.
1452
*/
1453
void ceph_check_caps(struct ceph_inode_info *ci, int flags,
1454
struct ceph_mds_session *session)
1455
{
1456
struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
1457
struct ceph_mds_client *mdsc = fsc->mdsc;
1458
struct inode *inode = &ci->vfs_inode;
1459
struct ceph_cap *cap;
1460
int file_wanted, used;
1461
int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */
1462
int issued, implemented, want, retain, revoking, flushing = 0;
1463
int mds = -1; /* keep track of how far we've gone through i_caps list
1464
to avoid an infinite loop on retry */
1465
struct rb_node *p;
1466
int tried_invalidate = 0;
1467
int delayed = 0, sent = 0, force_requeue = 0, num;
1468
int queue_invalidate = 0;
1469
int is_delayed = flags & CHECK_CAPS_NODELAY;
1470
1471
/* if we are unmounting, flush any unused caps immediately. */
1472
if (mdsc->stopping)
1473
is_delayed = 1;
1474
1475
spin_lock(&inode->i_lock);
1476
1477
if (ci->i_ceph_flags & CEPH_I_FLUSH)
1478
flags |= CHECK_CAPS_FLUSH;
1479
1480
/* flush snaps first time around only */
1481
if (!list_empty(&ci->i_cap_snaps))
1482
__ceph_flush_snaps(ci, &session, 0);
1483
goto retry_locked;
1484
retry:
1485
spin_lock(&inode->i_lock);
1486
retry_locked:
1487
file_wanted = __ceph_caps_file_wanted(ci);
1488
used = __ceph_caps_used(ci);
1489
want = file_wanted | used;
1490
issued = __ceph_caps_issued(ci, &implemented);
1491
revoking = implemented & ~issued;
1492
1493
retain = want | CEPH_CAP_PIN;
1494
if (!mdsc->stopping && inode->i_nlink > 0) {
1495
if (want) {
1496
retain |= CEPH_CAP_ANY; /* be greedy */
1497
} else {
1498
retain |= CEPH_CAP_ANY_SHARED;
1499
/*
1500
* keep RD only if we didn't have the file open RW,
1501
* because then the mds would revoke it anyway to
1502
* journal max_size=0.
1503
*/
1504
if (ci->i_max_size == 0)
1505
retain |= CEPH_CAP_ANY_RD;
1506
}
1507
}
1508
1509
dout("check_caps %p file_want %s used %s dirty %s flushing %s"
1510
" issued %s revoking %s retain %s %s%s%s\n", inode,
1511
ceph_cap_string(file_wanted),
1512
ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps),
1513
ceph_cap_string(ci->i_flushing_caps),
1514
ceph_cap_string(issued), ceph_cap_string(revoking),
1515
ceph_cap_string(retain),
1516
(flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "",
1517
(flags & CHECK_CAPS_NODELAY) ? " NODELAY" : "",
1518
(flags & CHECK_CAPS_FLUSH) ? " FLUSH" : "");
1519
1520
/*
1521
* If we no longer need to hold onto old our caps, and we may
1522
* have cached pages, but don't want them, then try to invalidate.
1523
* If we fail, it's because pages are locked.... try again later.
1524
*/
1525
if ((!is_delayed || mdsc->stopping) &&
1526
ci->i_wrbuffer_ref == 0 && /* no dirty pages... */
1527
inode->i_data.nrpages && /* have cached pages */
1528
(file_wanted == 0 || /* no open files */
1529
(revoking & (CEPH_CAP_FILE_CACHE|
1530
CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */
1531
!tried_invalidate) {
1532
dout("check_caps trying to invalidate on %p\n", inode);
1533
if (try_nonblocking_invalidate(inode) < 0) {
1534
if (revoking & (CEPH_CAP_FILE_CACHE|
1535
CEPH_CAP_FILE_LAZYIO)) {
1536
dout("check_caps queuing invalidate\n");
1537
queue_invalidate = 1;
1538
ci->i_rdcache_revoking = ci->i_rdcache_gen;
1539
} else {
1540
dout("check_caps failed to invalidate pages\n");
1541
/* we failed to invalidate pages. check these
1542
caps again later. */
1543
force_requeue = 1;
1544
__cap_set_timeouts(mdsc, ci);
1545
}
1546
}
1547
tried_invalidate = 1;
1548
goto retry_locked;
1549
}
1550
1551
num = 0;
1552
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
1553
cap = rb_entry(p, struct ceph_cap, ci_node);
1554
num++;
1555
1556
/* avoid looping forever */
1557
if (mds >= cap->mds ||
1558
((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap))
1559
continue;
1560
1561
/* NOTE: no side-effects allowed, until we take s_mutex */
1562
1563
revoking = cap->implemented & ~cap->issued;
1564
dout(" mds%d cap %p issued %s implemented %s revoking %s\n",
1565
cap->mds, cap, ceph_cap_string(cap->issued),
1566
ceph_cap_string(cap->implemented),
1567
ceph_cap_string(revoking));
1568
1569
if (cap == ci->i_auth_cap &&
1570
(cap->issued & CEPH_CAP_FILE_WR)) {
1571
/* request larger max_size from MDS? */
1572
if (ci->i_wanted_max_size > ci->i_max_size &&
1573
ci->i_wanted_max_size > ci->i_requested_max_size) {
1574
dout("requesting new max_size\n");
1575
goto ack;
1576
}
1577
1578
/* approaching file_max? */
1579
if ((inode->i_size << 1) >= ci->i_max_size &&
1580
(ci->i_reported_size << 1) < ci->i_max_size) {
1581
dout("i_size approaching max_size\n");
1582
goto ack;
1583
}
1584
}
1585
/* flush anything dirty? */
1586
if (cap == ci->i_auth_cap && (flags & CHECK_CAPS_FLUSH) &&
1587
ci->i_dirty_caps) {
1588
dout("flushing dirty caps\n");
1589
goto ack;
1590
}
1591
1592
/* completed revocation? going down and there are no caps? */
1593
if (revoking && (revoking & used) == 0) {
1594
dout("completed revocation of %s\n",
1595
ceph_cap_string(cap->implemented & ~cap->issued));
1596
goto ack;
1597
}
1598
1599
/* want more caps from mds? */
1600
if (want & ~(cap->mds_wanted | cap->issued))
1601
goto ack;
1602
1603
/* things we might delay */
1604
if ((cap->issued & ~retain) == 0 &&
1605
cap->mds_wanted == want)
1606
continue; /* nope, all good */
1607
1608
if (is_delayed)
1609
goto ack;
1610
1611
/* delay? */
1612
if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 &&
1613
time_before(jiffies, ci->i_hold_caps_max)) {
1614
dout(" delaying issued %s -> %s, wanted %s -> %s\n",
1615
ceph_cap_string(cap->issued),
1616
ceph_cap_string(cap->issued & retain),
1617
ceph_cap_string(cap->mds_wanted),
1618
ceph_cap_string(want));
1619
delayed++;
1620
continue;
1621
}
1622
1623
ack:
1624
if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
1625
dout(" skipping %p I_NOFLUSH set\n", inode);
1626
continue;
1627
}
1628
1629
if (session && session != cap->session) {
1630
dout("oops, wrong session %p mutex\n", session);
1631
mutex_unlock(&session->s_mutex);
1632
session = NULL;
1633
}
1634
if (!session) {
1635
session = cap->session;
1636
if (mutex_trylock(&session->s_mutex) == 0) {
1637
dout("inverting session/ino locks on %p\n",
1638
session);
1639
spin_unlock(&inode->i_lock);
1640
if (took_snap_rwsem) {
1641
up_read(&mdsc->snap_rwsem);
1642
took_snap_rwsem = 0;
1643
}
1644
mutex_lock(&session->s_mutex);
1645
goto retry;
1646
}
1647
}
1648
/* take snap_rwsem after session mutex */
1649
if (!took_snap_rwsem) {
1650
if (down_read_trylock(&mdsc->snap_rwsem) == 0) {
1651
dout("inverting snap/in locks on %p\n",
1652
inode);
1653
spin_unlock(&inode->i_lock);
1654
down_read(&mdsc->snap_rwsem);
1655
took_snap_rwsem = 1;
1656
goto retry;
1657
}
1658
took_snap_rwsem = 1;
1659
}
1660
1661
if (cap == ci->i_auth_cap && ci->i_dirty_caps)
1662
flushing = __mark_caps_flushing(inode, session);
1663
else
1664
flushing = 0;
1665
1666
mds = cap->mds; /* remember mds, so we don't repeat */
1667
sent++;
1668
1669
/* __send_cap drops i_lock */
1670
delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, used, want,
1671
retain, flushing, NULL);
1672
goto retry; /* retake i_lock and restart our cap scan. */
1673
}
1674
1675
/*
1676
* Reschedule delayed caps release if we delayed anything,
1677
* otherwise cancel.
1678
*/
1679
if (delayed && is_delayed)
1680
force_requeue = 1; /* __send_cap delayed release; requeue */
1681
if (!delayed && !is_delayed)
1682
__cap_delay_cancel(mdsc, ci);
1683
else if (!is_delayed || force_requeue)
1684
__cap_delay_requeue(mdsc, ci);
1685
1686
spin_unlock(&inode->i_lock);
1687
1688
if (queue_invalidate)
1689
ceph_queue_invalidate(inode);
1690
1691
if (session)
1692
mutex_unlock(&session->s_mutex);
1693
if (took_snap_rwsem)
1694
up_read(&mdsc->snap_rwsem);
1695
}
1696
1697
/*
1698
* Try to flush dirty caps back to the auth mds.
1699
*/
1700
static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session,
1701
unsigned *flush_tid)
1702
{
1703
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
1704
struct ceph_inode_info *ci = ceph_inode(inode);
1705
int unlock_session = session ? 0 : 1;
1706
int flushing = 0;
1707
1708
retry:
1709
spin_lock(&inode->i_lock);
1710
if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
1711
dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode);
1712
goto out;
1713
}
1714
if (ci->i_dirty_caps && ci->i_auth_cap) {
1715
struct ceph_cap *cap = ci->i_auth_cap;
1716
int used = __ceph_caps_used(ci);
1717
int want = __ceph_caps_wanted(ci);
1718
int delayed;
1719
1720
if (!session) {
1721
spin_unlock(&inode->i_lock);
1722
session = cap->session;
1723
mutex_lock(&session->s_mutex);
1724
goto retry;
1725
}
1726
BUG_ON(session != cap->session);
1727
if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
1728
goto out;
1729
1730
flushing = __mark_caps_flushing(inode, session);
1731
1732
/* __send_cap drops i_lock */
1733
delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
1734
cap->issued | cap->implemented, flushing,
1735
flush_tid);
1736
if (!delayed)
1737
goto out_unlocked;
1738
1739
spin_lock(&inode->i_lock);
1740
__cap_delay_requeue(mdsc, ci);
1741
}
1742
out:
1743
spin_unlock(&inode->i_lock);
1744
out_unlocked:
1745
if (session && unlock_session)
1746
mutex_unlock(&session->s_mutex);
1747
return flushing;
1748
}
1749
1750
/*
1751
* Return true if we've flushed caps through the given flush_tid.
1752
*/
1753
static int caps_are_flushed(struct inode *inode, unsigned tid)
1754
{
1755
struct ceph_inode_info *ci = ceph_inode(inode);
1756
int i, ret = 1;
1757
1758
spin_lock(&inode->i_lock);
1759
for (i = 0; i < CEPH_CAP_BITS; i++)
1760
if ((ci->i_flushing_caps & (1 << i)) &&
1761
ci->i_cap_flush_tid[i] <= tid) {
1762
/* still flushing this bit */
1763
ret = 0;
1764
break;
1765
}
1766
spin_unlock(&inode->i_lock);
1767
return ret;
1768
}
1769
1770
/*
1771
* Wait on any unsafe replies for the given inode. First wait on the
1772
* newest request, and make that the upper bound. Then, if there are
1773
* more requests, keep waiting on the oldest as long as it is still older
1774
* than the original request.
1775
*/
1776
static void sync_write_wait(struct inode *inode)
1777
{
1778
struct ceph_inode_info *ci = ceph_inode(inode);
1779
struct list_head *head = &ci->i_unsafe_writes;
1780
struct ceph_osd_request *req;
1781
u64 last_tid;
1782
1783
spin_lock(&ci->i_unsafe_lock);
1784
if (list_empty(head))
1785
goto out;
1786
1787
/* set upper bound as _last_ entry in chain */
1788
req = list_entry(head->prev, struct ceph_osd_request,
1789
r_unsafe_item);
1790
last_tid = req->r_tid;
1791
1792
do {
1793
ceph_osdc_get_request(req);
1794
spin_unlock(&ci->i_unsafe_lock);
1795
dout("sync_write_wait on tid %llu (until %llu)\n",
1796
req->r_tid, last_tid);
1797
wait_for_completion(&req->r_safe_completion);
1798
spin_lock(&ci->i_unsafe_lock);
1799
ceph_osdc_put_request(req);
1800
1801
/*
1802
* from here on look at first entry in chain, since we
1803
* only want to wait for anything older than last_tid
1804
*/
1805
if (list_empty(head))
1806
break;
1807
req = list_entry(head->next, struct ceph_osd_request,
1808
r_unsafe_item);
1809
} while (req->r_tid < last_tid);
1810
out:
1811
spin_unlock(&ci->i_unsafe_lock);
1812
}
1813
1814
int ceph_fsync(struct file *file, int datasync)
1815
{
1816
struct inode *inode = file->f_mapping->host;
1817
struct ceph_inode_info *ci = ceph_inode(inode);
1818
unsigned flush_tid;
1819
int ret;
1820
int dirty;
1821
1822
dout("fsync %p%s\n", inode, datasync ? " datasync" : "");
1823
sync_write_wait(inode);
1824
1825
ret = filemap_write_and_wait(inode->i_mapping);
1826
if (ret < 0)
1827
return ret;
1828
1829
dirty = try_flush_caps(inode, NULL, &flush_tid);
1830
dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
1831
1832
/*
1833
* only wait on non-file metadata writeback (the mds
1834
* can recover size and mtime, so we don't need to
1835
* wait for that)
1836
*/
1837
if (!datasync && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
1838
dout("fsync waiting for flush_tid %u\n", flush_tid);
1839
ret = wait_event_interruptible(ci->i_cap_wq,
1840
caps_are_flushed(inode, flush_tid));
1841
}
1842
1843
dout("fsync %p%s done\n", inode, datasync ? " datasync" : "");
1844
return ret;
1845
}
1846
1847
/*
1848
* Flush any dirty caps back to the mds. If we aren't asked to wait,
1849
* queue inode for flush but don't do so immediately, because we can
1850
* get by with fewer MDS messages if we wait for data writeback to
1851
* complete first.
1852
*/
1853
int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
1854
{
1855
struct ceph_inode_info *ci = ceph_inode(inode);
1856
unsigned flush_tid;
1857
int err = 0;
1858
int dirty;
1859
int wait = wbc->sync_mode == WB_SYNC_ALL;
1860
1861
dout("write_inode %p wait=%d\n", inode, wait);
1862
if (wait) {
1863
dirty = try_flush_caps(inode, NULL, &flush_tid);
1864
if (dirty)
1865
err = wait_event_interruptible(ci->i_cap_wq,
1866
caps_are_flushed(inode, flush_tid));
1867
} else {
1868
struct ceph_mds_client *mdsc =
1869
ceph_sb_to_client(inode->i_sb)->mdsc;
1870
1871
spin_lock(&inode->i_lock);
1872
if (__ceph_caps_dirty(ci))
1873
__cap_delay_requeue_front(mdsc, ci);
1874
spin_unlock(&inode->i_lock);
1875
}
1876
return err;
1877
}
1878
1879
/*
1880
* After a recovering MDS goes active, we need to resend any caps
1881
* we were flushing.
1882
*
1883
* Caller holds session->s_mutex.
1884
*/
1885
static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
1886
struct ceph_mds_session *session)
1887
{
1888
struct ceph_cap_snap *capsnap;
1889
1890
dout("kick_flushing_capsnaps mds%d\n", session->s_mds);
1891
list_for_each_entry(capsnap, &session->s_cap_snaps_flushing,
1892
flushing_item) {
1893
struct ceph_inode_info *ci = capsnap->ci;
1894
struct inode *inode = &ci->vfs_inode;
1895
struct ceph_cap *cap;
1896
1897
spin_lock(&inode->i_lock);
1898
cap = ci->i_auth_cap;
1899
if (cap && cap->session == session) {
1900
dout("kick_flushing_caps %p cap %p capsnap %p\n", inode,
1901
cap, capsnap);
1902
__ceph_flush_snaps(ci, &session, 1);
1903
} else {
1904
pr_err("%p auth cap %p not mds%d ???\n", inode,
1905
cap, session->s_mds);
1906
}
1907
spin_unlock(&inode->i_lock);
1908
}
1909
}
1910
1911
void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
1912
struct ceph_mds_session *session)
1913
{
1914
struct ceph_inode_info *ci;
1915
1916
kick_flushing_capsnaps(mdsc, session);
1917
1918
dout("kick_flushing_caps mds%d\n", session->s_mds);
1919
list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
1920
struct inode *inode = &ci->vfs_inode;
1921
struct ceph_cap *cap;
1922
int delayed = 0;
1923
1924
spin_lock(&inode->i_lock);
1925
cap = ci->i_auth_cap;
1926
if (cap && cap->session == session) {
1927
dout("kick_flushing_caps %p cap %p %s\n", inode,
1928
cap, ceph_cap_string(ci->i_flushing_caps));
1929
delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
1930
__ceph_caps_used(ci),
1931
__ceph_caps_wanted(ci),
1932
cap->issued | cap->implemented,
1933
ci->i_flushing_caps, NULL);
1934
if (delayed) {
1935
spin_lock(&inode->i_lock);
1936
__cap_delay_requeue(mdsc, ci);
1937
spin_unlock(&inode->i_lock);
1938
}
1939
} else {
1940
pr_err("%p auth cap %p not mds%d ???\n", inode,
1941
cap, session->s_mds);
1942
spin_unlock(&inode->i_lock);
1943
}
1944
}
1945
}
1946
1947
static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
1948
struct ceph_mds_session *session,
1949
struct inode *inode)
1950
{
1951
struct ceph_inode_info *ci = ceph_inode(inode);
1952
struct ceph_cap *cap;
1953
int delayed = 0;
1954
1955
spin_lock(&inode->i_lock);
1956
cap = ci->i_auth_cap;
1957
dout("kick_flushing_inode_caps %p flushing %s flush_seq %lld\n", inode,
1958
ceph_cap_string(ci->i_flushing_caps), ci->i_cap_flush_seq);
1959
__ceph_flush_snaps(ci, &session, 1);
1960
if (ci->i_flushing_caps) {
1961
delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
1962
__ceph_caps_used(ci),
1963
__ceph_caps_wanted(ci),
1964
cap->issued | cap->implemented,
1965
ci->i_flushing_caps, NULL);
1966
if (delayed) {
1967
spin_lock(&inode->i_lock);
1968
__cap_delay_requeue(mdsc, ci);
1969
spin_unlock(&inode->i_lock);
1970
}
1971
} else {
1972
spin_unlock(&inode->i_lock);
1973
}
1974
}
1975
1976
1977
/*
1978
* Take references to capabilities we hold, so that we don't release
1979
* them to the MDS prematurely.
1980
*
1981
* Protected by i_lock.
1982
*/
1983
static void __take_cap_refs(struct ceph_inode_info *ci, int got)
1984
{
1985
if (got & CEPH_CAP_PIN)
1986
ci->i_pin_ref++;
1987
if (got & CEPH_CAP_FILE_RD)
1988
ci->i_rd_ref++;
1989
if (got & CEPH_CAP_FILE_CACHE)
1990
ci->i_rdcache_ref++;
1991
if (got & CEPH_CAP_FILE_WR)
1992
ci->i_wr_ref++;
1993
if (got & CEPH_CAP_FILE_BUFFER) {
1994
if (ci->i_wb_ref == 0)
1995
ihold(&ci->vfs_inode);
1996
ci->i_wb_ref++;
1997
dout("__take_cap_refs %p wb %d -> %d (?)\n",
1998
&ci->vfs_inode, ci->i_wb_ref-1, ci->i_wb_ref);
1999
}
2000
}
2001
2002
/*
2003
* Try to grab cap references. Specify those refs we @want, and the
2004
* minimal set we @need. Also include the larger offset we are writing
2005
* to (when applicable), and check against max_size here as well.
2006
* Note that caller is responsible for ensuring max_size increases are
2007
* requested from the MDS.
2008
*/
2009
static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2010
int *got, loff_t endoff, int *check_max, int *err)
2011
{
2012
struct inode *inode = &ci->vfs_inode;
2013
int ret = 0;
2014
int have, implemented;
2015
int file_wanted;
2016
2017
dout("get_cap_refs %p need %s want %s\n", inode,
2018
ceph_cap_string(need), ceph_cap_string(want));
2019
spin_lock(&inode->i_lock);
2020
2021
/* make sure file is actually open */
2022
file_wanted = __ceph_caps_file_wanted(ci);
2023
if ((file_wanted & need) == 0) {
2024
dout("try_get_cap_refs need %s file_wanted %s, EBADF\n",
2025
ceph_cap_string(need), ceph_cap_string(file_wanted));
2026
*err = -EBADF;
2027
ret = 1;
2028
goto out;
2029
}
2030
2031
if (need & CEPH_CAP_FILE_WR) {
2032
if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
2033
dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
2034
inode, endoff, ci->i_max_size);
2035
if (endoff > ci->i_wanted_max_size) {
2036
*check_max = 1;
2037
ret = 1;
2038
}
2039
goto out;
2040
}
2041
/*
2042
* If a sync write is in progress, we must wait, so that we
2043
* can get a final snapshot value for size+mtime.
2044
*/
2045
if (__ceph_have_pending_cap_snap(ci)) {
2046
dout("get_cap_refs %p cap_snap_pending\n", inode);
2047
goto out;
2048
}
2049
}
2050
have = __ceph_caps_issued(ci, &implemented);
2051
2052
/*
2053
* disallow writes while a truncate is pending
2054
*/
2055
if (ci->i_truncate_pending)
2056
have &= ~CEPH_CAP_FILE_WR;
2057
2058
if ((have & need) == need) {
2059
/*
2060
* Look at (implemented & ~have & not) so that we keep waiting
2061
* on transition from wanted -> needed caps. This is needed
2062
* for WRBUFFER|WR -> WR to avoid a new WR sync write from
2063
* going before a prior buffered writeback happens.
2064
*/
2065
int not = want & ~(have & need);
2066
int revoking = implemented & ~have;
2067
dout("get_cap_refs %p have %s but not %s (revoking %s)\n",
2068
inode, ceph_cap_string(have), ceph_cap_string(not),
2069
ceph_cap_string(revoking));
2070
if ((revoking & not) == 0) {
2071
*got = need | (have & want);
2072
__take_cap_refs(ci, *got);
2073
ret = 1;
2074
}
2075
} else {
2076
dout("get_cap_refs %p have %s needed %s\n", inode,
2077
ceph_cap_string(have), ceph_cap_string(need));
2078
}
2079
out:
2080
spin_unlock(&inode->i_lock);
2081
dout("get_cap_refs %p ret %d got %s\n", inode,
2082
ret, ceph_cap_string(*got));
2083
return ret;
2084
}
2085
2086
/*
2087
* Check the offset we are writing up to against our current
2088
* max_size. If necessary, tell the MDS we want to write to
2089
* a larger offset.
2090
*/
2091
static void check_max_size(struct inode *inode, loff_t endoff)
2092
{
2093
struct ceph_inode_info *ci = ceph_inode(inode);
2094
int check = 0;
2095
2096
/* do we need to explicitly request a larger max_size? */
2097
spin_lock(&inode->i_lock);
2098
if ((endoff >= ci->i_max_size ||
2099
endoff > (inode->i_size << 1)) &&
2100
endoff > ci->i_wanted_max_size) {
2101
dout("write %p at large endoff %llu, req max_size\n",
2102
inode, endoff);
2103
ci->i_wanted_max_size = endoff;
2104
check = 1;
2105
}
2106
spin_unlock(&inode->i_lock);
2107
if (check)
2108
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
2109
}
2110
2111
/*
2112
* Wait for caps, and take cap references. If we can't get a WR cap
2113
* due to a small max_size, make sure we check_max_size (and possibly
2114
* ask the mds) so we don't get hung up indefinitely.
2115
*/
2116
int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got,
2117
loff_t endoff)
2118
{
2119
int check_max, ret, err;
2120
2121
retry:
2122
if (endoff > 0)
2123
check_max_size(&ci->vfs_inode, endoff);
2124
check_max = 0;
2125
err = 0;
2126
ret = wait_event_interruptible(ci->i_cap_wq,
2127
try_get_cap_refs(ci, need, want,
2128
got, endoff,
2129
&check_max, &err));
2130
if (err)
2131
ret = err;
2132
if (check_max)
2133
goto retry;
2134
return ret;
2135
}
2136
2137
/*
2138
* Take cap refs. Caller must already know we hold at least one ref
2139
* on the caps in question or we don't know this is safe.
2140
*/
2141
void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
2142
{
2143
spin_lock(&ci->vfs_inode.i_lock);
2144
__take_cap_refs(ci, caps);
2145
spin_unlock(&ci->vfs_inode.i_lock);
2146
}
2147
2148
/*
2149
* Release cap refs.
2150
*
2151
* If we released the last ref on any given cap, call ceph_check_caps
2152
* to release (or schedule a release).
2153
*
2154
* If we are releasing a WR cap (from a sync write), finalize any affected
2155
* cap_snap, and wake up any waiters.
2156
*/
2157
void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
2158
{
2159
struct inode *inode = &ci->vfs_inode;
2160
int last = 0, put = 0, flushsnaps = 0, wake = 0;
2161
struct ceph_cap_snap *capsnap;
2162
2163
spin_lock(&inode->i_lock);
2164
if (had & CEPH_CAP_PIN)
2165
--ci->i_pin_ref;
2166
if (had & CEPH_CAP_FILE_RD)
2167
if (--ci->i_rd_ref == 0)
2168
last++;
2169
if (had & CEPH_CAP_FILE_CACHE)
2170
if (--ci->i_rdcache_ref == 0)
2171
last++;
2172
if (had & CEPH_CAP_FILE_BUFFER) {
2173
if (--ci->i_wb_ref == 0) {
2174
last++;
2175
put++;
2176
}
2177
dout("put_cap_refs %p wb %d -> %d (?)\n",
2178
inode, ci->i_wb_ref+1, ci->i_wb_ref);
2179
}
2180
if (had & CEPH_CAP_FILE_WR)
2181
if (--ci->i_wr_ref == 0) {
2182
last++;
2183
if (!list_empty(&ci->i_cap_snaps)) {
2184
capsnap = list_first_entry(&ci->i_cap_snaps,
2185
struct ceph_cap_snap,
2186
ci_item);
2187
if (capsnap->writing) {
2188
capsnap->writing = 0;
2189
flushsnaps =
2190
__ceph_finish_cap_snap(ci,
2191
capsnap);
2192
wake = 1;
2193
}
2194
}
2195
}
2196
spin_unlock(&inode->i_lock);
2197
2198
dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had),
2199
last ? " last" : "", put ? " put" : "");
2200
2201
if (last && !flushsnaps)
2202
ceph_check_caps(ci, 0, NULL);
2203
else if (flushsnaps)
2204
ceph_flush_snaps(ci);
2205
if (wake)
2206
wake_up_all(&ci->i_cap_wq);
2207
if (put)
2208
iput(inode);
2209
}
2210
2211
/*
2212
* Release @nr WRBUFFER refs on dirty pages for the given @snapc snap
2213
* context. Adjust per-snap dirty page accounting as appropriate.
2214
* Once all dirty data for a cap_snap is flushed, flush snapped file
2215
* metadata back to the MDS. If we dropped the last ref, call
2216
* ceph_check_caps.
2217
*/
2218
void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
2219
struct ceph_snap_context *snapc)
2220
{
2221
struct inode *inode = &ci->vfs_inode;
2222
int last = 0;
2223
int complete_capsnap = 0;
2224
int drop_capsnap = 0;
2225
int found = 0;
2226
struct ceph_cap_snap *capsnap = NULL;
2227
2228
spin_lock(&inode->i_lock);
2229
ci->i_wrbuffer_ref -= nr;
2230
last = !ci->i_wrbuffer_ref;
2231
2232
if (ci->i_head_snapc == snapc) {
2233
ci->i_wrbuffer_ref_head -= nr;
2234
if (ci->i_wrbuffer_ref_head == 0 &&
2235
ci->i_dirty_caps == 0 && ci->i_flushing_caps == 0) {
2236
BUG_ON(!ci->i_head_snapc);
2237
ceph_put_snap_context(ci->i_head_snapc);
2238
ci->i_head_snapc = NULL;
2239
}
2240
dout("put_wrbuffer_cap_refs on %p head %d/%d -> %d/%d %s\n",
2241
inode,
2242
ci->i_wrbuffer_ref+nr, ci->i_wrbuffer_ref_head+nr,
2243
ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
2244
last ? " LAST" : "");
2245
} else {
2246
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
2247
if (capsnap->context == snapc) {
2248
found = 1;
2249
break;
2250
}
2251
}
2252
BUG_ON(!found);
2253
capsnap->dirty_pages -= nr;
2254
if (capsnap->dirty_pages == 0) {
2255
complete_capsnap = 1;
2256
if (capsnap->dirty == 0)
2257
/* cap writeback completed before we created
2258
* the cap_snap; no FLUSHSNAP is needed */
2259
drop_capsnap = 1;
2260
}
2261
dout("put_wrbuffer_cap_refs on %p cap_snap %p "
2262
" snap %lld %d/%d -> %d/%d %s%s%s\n",
2263
inode, capsnap, capsnap->context->seq,
2264
ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr,
2265
ci->i_wrbuffer_ref, capsnap->dirty_pages,
2266
last ? " (wrbuffer last)" : "",
2267
complete_capsnap ? " (complete capsnap)" : "",
2268
drop_capsnap ? " (drop capsnap)" : "");
2269
if (drop_capsnap) {
2270
ceph_put_snap_context(capsnap->context);
2271
list_del(&capsnap->ci_item);
2272
list_del(&capsnap->flushing_item);
2273
ceph_put_cap_snap(capsnap);
2274
}
2275
}
2276
2277
spin_unlock(&inode->i_lock);
2278
2279
if (last) {
2280
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
2281
iput(inode);
2282
} else if (complete_capsnap) {
2283
ceph_flush_snaps(ci);
2284
wake_up_all(&ci->i_cap_wq);
2285
}
2286
if (drop_capsnap)
2287
iput(inode);
2288
}
2289
2290
/*
2291
* Handle a cap GRANT message from the MDS. (Note that a GRANT may
2292
* actually be a revocation if it specifies a smaller cap set.)
2293
*
2294
* caller holds s_mutex and i_lock, we drop both.
2295
*
2296
* return value:
2297
* 0 - ok
2298
* 1 - check_caps on auth cap only (writeback)
2299
* 2 - check_caps (ack revoke)
2300
*/
2301
static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
2302
struct ceph_mds_session *session,
2303
struct ceph_cap *cap,
2304
struct ceph_buffer *xattr_buf)
2305
__releases(inode->i_lock)
2306
{
2307
struct ceph_inode_info *ci = ceph_inode(inode);
2308
int mds = session->s_mds;
2309
int seq = le32_to_cpu(grant->seq);
2310
int newcaps = le32_to_cpu(grant->caps);
2311
int issued, implemented, used, wanted, dirty;
2312
u64 size = le64_to_cpu(grant->size);
2313
u64 max_size = le64_to_cpu(grant->max_size);
2314
struct timespec mtime, atime, ctime;
2315
int check_caps = 0;
2316
int wake = 0;
2317
int writeback = 0;
2318
int revoked_rdcache = 0;
2319
int queue_invalidate = 0;
2320
2321
dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
2322
inode, cap, mds, seq, ceph_cap_string(newcaps));
2323
dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
2324
inode->i_size);
2325
2326
/*
2327
* If CACHE is being revoked, and we have no dirty buffers,
2328
* try to invalidate (once). (If there are dirty buffers, we
2329
* will invalidate _after_ writeback.)
2330
*/
2331
if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
2332
(newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
2333
!ci->i_wrbuffer_ref) {
2334
if (try_nonblocking_invalidate(inode) == 0) {
2335
revoked_rdcache = 1;
2336
} else {
2337
/* there were locked pages.. invalidate later
2338
in a separate thread. */
2339
if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
2340
queue_invalidate = 1;
2341
ci->i_rdcache_revoking = ci->i_rdcache_gen;
2342
}
2343
}
2344
}
2345
2346
/* side effects now are allowed */
2347
2348
issued = __ceph_caps_issued(ci, &implemented);
2349
issued |= implemented | __ceph_caps_dirty(ci);
2350
2351
cap->cap_gen = session->s_cap_gen;
2352
2353
__check_cap_issue(ci, cap, newcaps);
2354
2355
if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
2356
inode->i_mode = le32_to_cpu(grant->mode);
2357
inode->i_uid = le32_to_cpu(grant->uid);
2358
inode->i_gid = le32_to_cpu(grant->gid);
2359
dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode,
2360
inode->i_uid, inode->i_gid);
2361
}
2362
2363
if ((issued & CEPH_CAP_LINK_EXCL) == 0)
2364
inode->i_nlink = le32_to_cpu(grant->nlink);
2365
2366
if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) {
2367
int len = le32_to_cpu(grant->xattr_len);
2368
u64 version = le64_to_cpu(grant->xattr_version);
2369
2370
if (version > ci->i_xattrs.version) {
2371
dout(" got new xattrs v%llu on %p len %d\n",
2372
version, inode, len);
2373
if (ci->i_xattrs.blob)
2374
ceph_buffer_put(ci->i_xattrs.blob);
2375
ci->i_xattrs.blob = ceph_buffer_get(xattr_buf);
2376
ci->i_xattrs.version = version;
2377
}
2378
}
2379
2380
/* size/ctime/mtime/atime? */
2381
ceph_fill_file_size(inode, issued,
2382
le32_to_cpu(grant->truncate_seq),
2383
le64_to_cpu(grant->truncate_size), size);
2384
ceph_decode_timespec(&mtime, &grant->mtime);
2385
ceph_decode_timespec(&atime, &grant->atime);
2386
ceph_decode_timespec(&ctime, &grant->ctime);
2387
ceph_fill_file_time(inode, issued,
2388
le32_to_cpu(grant->time_warp_seq), &ctime, &mtime,
2389
&atime);
2390
2391
/* max size increase? */
2392
if (max_size != ci->i_max_size) {
2393
dout("max_size %lld -> %llu\n", ci->i_max_size, max_size);
2394
ci->i_max_size = max_size;
2395
if (max_size >= ci->i_wanted_max_size) {
2396
ci->i_wanted_max_size = 0; /* reset */
2397
ci->i_requested_max_size = 0;
2398
}
2399
wake = 1;
2400
}
2401
2402
/* check cap bits */
2403
wanted = __ceph_caps_wanted(ci);
2404
used = __ceph_caps_used(ci);
2405
dirty = __ceph_caps_dirty(ci);
2406
dout(" my wanted = %s, used = %s, dirty %s\n",
2407
ceph_cap_string(wanted),
2408
ceph_cap_string(used),
2409
ceph_cap_string(dirty));
2410
if (wanted != le32_to_cpu(grant->wanted)) {
2411
dout("mds wanted %s -> %s\n",
2412
ceph_cap_string(le32_to_cpu(grant->wanted)),
2413
ceph_cap_string(wanted));
2414
grant->wanted = cpu_to_le32(wanted);
2415
}
2416
2417
cap->seq = seq;
2418
2419
/* file layout may have changed */
2420
ci->i_layout = grant->layout;
2421
2422
/* revocation, grant, or no-op? */
2423
if (cap->issued & ~newcaps) {
2424
int revoking = cap->issued & ~newcaps;
2425
2426
dout("revocation: %s -> %s (revoking %s)\n",
2427
ceph_cap_string(cap->issued),
2428
ceph_cap_string(newcaps),
2429
ceph_cap_string(revoking));
2430
if (revoking & used & CEPH_CAP_FILE_BUFFER)
2431
writeback = 1; /* initiate writeback; will delay ack */
2432
else if (revoking == CEPH_CAP_FILE_CACHE &&
2433
(newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
2434
queue_invalidate)
2435
; /* do nothing yet, invalidation will be queued */
2436
else if (cap == ci->i_auth_cap)
2437
check_caps = 1; /* check auth cap only */
2438
else
2439
check_caps = 2; /* check all caps */
2440
cap->issued = newcaps;
2441
cap->implemented |= newcaps;
2442
} else if (cap->issued == newcaps) {
2443
dout("caps unchanged: %s -> %s\n",
2444
ceph_cap_string(cap->issued), ceph_cap_string(newcaps));
2445
} else {
2446
dout("grant: %s -> %s\n", ceph_cap_string(cap->issued),
2447
ceph_cap_string(newcaps));
2448
cap->issued = newcaps;
2449
cap->implemented |= newcaps; /* add bits only, to
2450
* avoid stepping on a
2451
* pending revocation */
2452
wake = 1;
2453
}
2454
BUG_ON(cap->issued & ~cap->implemented);
2455
2456
spin_unlock(&inode->i_lock);
2457
if (writeback)
2458
/*
2459
* queue inode for writeback: we can't actually call
2460
* filemap_write_and_wait, etc. from message handler
2461
* context.
2462
*/
2463
ceph_queue_writeback(inode);
2464
if (queue_invalidate)
2465
ceph_queue_invalidate(inode);
2466
if (wake)
2467
wake_up_all(&ci->i_cap_wq);
2468
2469
if (check_caps == 1)
2470
ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
2471
session);
2472
else if (check_caps == 2)
2473
ceph_check_caps(ci, CHECK_CAPS_NODELAY, session);
2474
else
2475
mutex_unlock(&session->s_mutex);
2476
}
2477
2478
/*
2479
* Handle FLUSH_ACK from MDS, indicating that metadata we sent to the
2480
* MDS has been safely committed.
2481
*/
2482
static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
2483
struct ceph_mds_caps *m,
2484
struct ceph_mds_session *session,
2485
struct ceph_cap *cap)
2486
__releases(inode->i_lock)
2487
{
2488
struct ceph_inode_info *ci = ceph_inode(inode);
2489
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
2490
unsigned seq = le32_to_cpu(m->seq);
2491
int dirty = le32_to_cpu(m->dirty);
2492
int cleaned = 0;
2493
int drop = 0;
2494
int i;
2495
2496
for (i = 0; i < CEPH_CAP_BITS; i++)
2497
if ((dirty & (1 << i)) &&
2498
flush_tid == ci->i_cap_flush_tid[i])
2499
cleaned |= 1 << i;
2500
2501
dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s,"
2502
" flushing %s -> %s\n",
2503
inode, session->s_mds, seq, ceph_cap_string(dirty),
2504
ceph_cap_string(cleaned), ceph_cap_string(ci->i_flushing_caps),
2505
ceph_cap_string(ci->i_flushing_caps & ~cleaned));
2506
2507
if (ci->i_flushing_caps == (ci->i_flushing_caps & ~cleaned))
2508
goto out;
2509
2510
ci->i_flushing_caps &= ~cleaned;
2511
2512
spin_lock(&mdsc->cap_dirty_lock);
2513
if (ci->i_flushing_caps == 0) {
2514
list_del_init(&ci->i_flushing_item);
2515
if (!list_empty(&session->s_cap_flushing))
2516
dout(" mds%d still flushing cap on %p\n",
2517
session->s_mds,
2518
&list_entry(session->s_cap_flushing.next,
2519
struct ceph_inode_info,
2520
i_flushing_item)->vfs_inode);
2521
mdsc->num_cap_flushing--;
2522
wake_up_all(&mdsc->cap_flushing_wq);
2523
dout(" inode %p now !flushing\n", inode);
2524
2525
if (ci->i_dirty_caps == 0) {
2526
dout(" inode %p now clean\n", inode);
2527
BUG_ON(!list_empty(&ci->i_dirty_item));
2528
drop = 1;
2529
if (ci->i_wrbuffer_ref_head == 0) {
2530
BUG_ON(!ci->i_head_snapc);
2531
ceph_put_snap_context(ci->i_head_snapc);
2532
ci->i_head_snapc = NULL;
2533
}
2534
} else {
2535
BUG_ON(list_empty(&ci->i_dirty_item));
2536
}
2537
}
2538
spin_unlock(&mdsc->cap_dirty_lock);
2539
wake_up_all(&ci->i_cap_wq);
2540
2541
out:
2542
spin_unlock(&inode->i_lock);
2543
if (drop)
2544
iput(inode);
2545
}
2546
2547
/*
2548
* Handle FLUSHSNAP_ACK. MDS has flushed snap data to disk and we can
2549
* throw away our cap_snap.
2550
*
2551
* Caller hold s_mutex.
2552
*/
2553
static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
2554
struct ceph_mds_caps *m,
2555
struct ceph_mds_session *session)
2556
{
2557
struct ceph_inode_info *ci = ceph_inode(inode);
2558
u64 follows = le64_to_cpu(m->snap_follows);
2559
struct ceph_cap_snap *capsnap;
2560
int drop = 0;
2561
2562
dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n",
2563
inode, ci, session->s_mds, follows);
2564
2565
spin_lock(&inode->i_lock);
2566
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
2567
if (capsnap->follows == follows) {
2568
if (capsnap->flush_tid != flush_tid) {
2569
dout(" cap_snap %p follows %lld tid %lld !="
2570
" %lld\n", capsnap, follows,
2571
flush_tid, capsnap->flush_tid);
2572
break;
2573
}
2574
WARN_ON(capsnap->dirty_pages || capsnap->writing);
2575
dout(" removing %p cap_snap %p follows %lld\n",
2576
inode, capsnap, follows);
2577
ceph_put_snap_context(capsnap->context);
2578
list_del(&capsnap->ci_item);
2579
list_del(&capsnap->flushing_item);
2580
ceph_put_cap_snap(capsnap);
2581
drop = 1;
2582
break;
2583
} else {
2584
dout(" skipping cap_snap %p follows %lld\n",
2585
capsnap, capsnap->follows);
2586
}
2587
}
2588
spin_unlock(&inode->i_lock);
2589
if (drop)
2590
iput(inode);
2591
}
2592
2593
/*
2594
* Handle TRUNC from MDS, indicating file truncation.
2595
*
2596
* caller hold s_mutex.
2597
*/
2598
static void handle_cap_trunc(struct inode *inode,
2599
struct ceph_mds_caps *trunc,
2600
struct ceph_mds_session *session)
2601
__releases(inode->i_lock)
2602
{
2603
struct ceph_inode_info *ci = ceph_inode(inode);
2604
int mds = session->s_mds;
2605
int seq = le32_to_cpu(trunc->seq);
2606
u32 truncate_seq = le32_to_cpu(trunc->truncate_seq);
2607
u64 truncate_size = le64_to_cpu(trunc->truncate_size);
2608
u64 size = le64_to_cpu(trunc->size);
2609
int implemented = 0;
2610
int dirty = __ceph_caps_dirty(ci);
2611
int issued = __ceph_caps_issued(ceph_inode(inode), &implemented);
2612
int queue_trunc = 0;
2613
2614
issued |= implemented | dirty;
2615
2616
dout("handle_cap_trunc inode %p mds%d seq %d to %lld seq %d\n",
2617
inode, mds, seq, truncate_size, truncate_seq);
2618
queue_trunc = ceph_fill_file_size(inode, issued,
2619
truncate_seq, truncate_size, size);
2620
spin_unlock(&inode->i_lock);
2621
2622
if (queue_trunc)
2623
ceph_queue_vmtruncate(inode);
2624
}
2625
2626
/*
2627
* Handle EXPORT from MDS. Cap is being migrated _from_ this mds to a
2628
* different one. If we are the most recent migration we've seen (as
2629
* indicated by mseq), make note of the migrating cap bits for the
2630
* duration (until we see the corresponding IMPORT).
2631
*
2632
* caller holds s_mutex
2633
*/
2634
static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
2635
struct ceph_mds_session *session,
2636
int *open_target_sessions)
2637
{
2638
struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
2639
struct ceph_inode_info *ci = ceph_inode(inode);
2640
int mds = session->s_mds;
2641
unsigned mseq = le32_to_cpu(ex->migrate_seq);
2642
struct ceph_cap *cap = NULL, *t;
2643
struct rb_node *p;
2644
int remember = 1;
2645
2646
dout("handle_cap_export inode %p ci %p mds%d mseq %d\n",
2647
inode, ci, mds, mseq);
2648
2649
spin_lock(&inode->i_lock);
2650
2651
/* make sure we haven't seen a higher mseq */
2652
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
2653
t = rb_entry(p, struct ceph_cap, ci_node);
2654
if (ceph_seq_cmp(t->mseq, mseq) > 0) {
2655
dout(" higher mseq on cap from mds%d\n",
2656
t->session->s_mds);
2657
remember = 0;
2658
}
2659
if (t->session->s_mds == mds)
2660
cap = t;
2661
}
2662
2663
if (cap) {
2664
if (remember) {
2665
/* make note */
2666
ci->i_cap_exporting_mds = mds;
2667
ci->i_cap_exporting_mseq = mseq;
2668
ci->i_cap_exporting_issued = cap->issued;
2669
2670
/*
2671
* make sure we have open sessions with all possible
2672
* export targets, so that we get the matching IMPORT
2673
*/
2674
*open_target_sessions = 1;
2675
2676
/*
2677
* we can't flush dirty caps that we've seen the
2678
* EXPORT but no IMPORT for
2679
*/
2680
spin_lock(&mdsc->cap_dirty_lock);
2681
if (!list_empty(&ci->i_dirty_item)) {
2682
dout(" moving %p to cap_dirty_migrating\n",
2683
inode);
2684
list_move(&ci->i_dirty_item,
2685
&mdsc->cap_dirty_migrating);
2686
}
2687
spin_unlock(&mdsc->cap_dirty_lock);
2688
}
2689
__ceph_remove_cap(cap);
2690
}
2691
/* else, we already released it */
2692
2693
spin_unlock(&inode->i_lock);
2694
}
2695
2696
/*
2697
* Handle cap IMPORT. If there are temp bits from an older EXPORT,
2698
* clean them up.
2699
*
2700
* caller holds s_mutex.
2701
*/
2702
static void handle_cap_import(struct ceph_mds_client *mdsc,
2703
struct inode *inode, struct ceph_mds_caps *im,
2704
struct ceph_mds_session *session,
2705
void *snaptrace, int snaptrace_len)
2706
{
2707
struct ceph_inode_info *ci = ceph_inode(inode);
2708
int mds = session->s_mds;
2709
unsigned issued = le32_to_cpu(im->caps);
2710
unsigned wanted = le32_to_cpu(im->wanted);
2711
unsigned seq = le32_to_cpu(im->seq);
2712
unsigned mseq = le32_to_cpu(im->migrate_seq);
2713
u64 realmino = le64_to_cpu(im->realm);
2714
u64 cap_id = le64_to_cpu(im->cap_id);
2715
2716
if (ci->i_cap_exporting_mds >= 0 &&
2717
ceph_seq_cmp(ci->i_cap_exporting_mseq, mseq) < 0) {
2718
dout("handle_cap_import inode %p ci %p mds%d mseq %d"
2719
" - cleared exporting from mds%d\n",
2720
inode, ci, mds, mseq,
2721
ci->i_cap_exporting_mds);
2722
ci->i_cap_exporting_issued = 0;
2723
ci->i_cap_exporting_mseq = 0;
2724
ci->i_cap_exporting_mds = -1;
2725
2726
spin_lock(&mdsc->cap_dirty_lock);
2727
if (!list_empty(&ci->i_dirty_item)) {
2728
dout(" moving %p back to cap_dirty\n", inode);
2729
list_move(&ci->i_dirty_item, &mdsc->cap_dirty);
2730
}
2731
spin_unlock(&mdsc->cap_dirty_lock);
2732
} else {
2733
dout("handle_cap_import inode %p ci %p mds%d mseq %d\n",
2734
inode, ci, mds, mseq);
2735
}
2736
2737
down_write(&mdsc->snap_rwsem);
2738
ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
2739
false);
2740
downgrade_write(&mdsc->snap_rwsem);
2741
ceph_add_cap(inode, session, cap_id, -1,
2742
issued, wanted, seq, mseq, realmino, CEPH_CAP_FLAG_AUTH,
2743
NULL /* no caps context */);
2744
kick_flushing_inode_caps(mdsc, session, inode);
2745
up_read(&mdsc->snap_rwsem);
2746
2747
/* make sure we re-request max_size, if necessary */
2748
spin_lock(&inode->i_lock);
2749
ci->i_requested_max_size = 0;
2750
spin_unlock(&inode->i_lock);
2751
}
2752
2753
/*
2754
* Handle a caps message from the MDS.
2755
*
2756
* Identify the appropriate session, inode, and call the right handler
2757
* based on the cap op.
2758
*/
2759
void ceph_handle_caps(struct ceph_mds_session *session,
2760
struct ceph_msg *msg)
2761
{
2762
struct ceph_mds_client *mdsc = session->s_mdsc;
2763
struct super_block *sb = mdsc->fsc->sb;
2764
struct inode *inode;
2765
struct ceph_cap *cap;
2766
struct ceph_mds_caps *h;
2767
int mds = session->s_mds;
2768
int op;
2769
u32 seq, mseq;
2770
struct ceph_vino vino;
2771
u64 cap_id;
2772
u64 size, max_size;
2773
u64 tid;
2774
void *snaptrace;
2775
size_t snaptrace_len;
2776
void *flock;
2777
u32 flock_len;
2778
int open_target_sessions = 0;
2779
2780
dout("handle_caps from mds%d\n", mds);
2781
2782
/* decode */
2783
tid = le64_to_cpu(msg->hdr.tid);
2784
if (msg->front.iov_len < sizeof(*h))
2785
goto bad;
2786
h = msg->front.iov_base;
2787
op = le32_to_cpu(h->op);
2788
vino.ino = le64_to_cpu(h->ino);
2789
vino.snap = CEPH_NOSNAP;
2790
cap_id = le64_to_cpu(h->cap_id);
2791
seq = le32_to_cpu(h->seq);
2792
mseq = le32_to_cpu(h->migrate_seq);
2793
size = le64_to_cpu(h->size);
2794
max_size = le64_to_cpu(h->max_size);
2795
2796
snaptrace = h + 1;
2797
snaptrace_len = le32_to_cpu(h->snap_trace_len);
2798
2799
if (le16_to_cpu(msg->hdr.version) >= 2) {
2800
void *p, *end;
2801
2802
p = snaptrace + snaptrace_len;
2803
end = msg->front.iov_base + msg->front.iov_len;
2804
ceph_decode_32_safe(&p, end, flock_len, bad);
2805
flock = p;
2806
} else {
2807
flock = NULL;
2808
flock_len = 0;
2809
}
2810
2811
mutex_lock(&session->s_mutex);
2812
session->s_seq++;
2813
dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
2814
(unsigned)seq);
2815
2816
/* lookup ino */
2817
inode = ceph_find_inode(sb, vino);
2818
dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino,
2819
vino.snap, inode);
2820
if (!inode) {
2821
dout(" i don't have ino %llx\n", vino.ino);
2822
2823
if (op == CEPH_CAP_OP_IMPORT)
2824
__queue_cap_release(session, vino.ino, cap_id,
2825
mseq, seq);
2826
goto flush_cap_releases;
2827
}
2828
2829
/* these will work even if we don't have a cap yet */
2830
switch (op) {
2831
case CEPH_CAP_OP_FLUSHSNAP_ACK:
2832
handle_cap_flushsnap_ack(inode, tid, h, session);
2833
goto done;
2834
2835
case CEPH_CAP_OP_EXPORT:
2836
handle_cap_export(inode, h, session, &open_target_sessions);
2837
goto done;
2838
2839
case CEPH_CAP_OP_IMPORT:
2840
handle_cap_import(mdsc, inode, h, session,
2841
snaptrace, snaptrace_len);
2842
ceph_check_caps(ceph_inode(inode), 0, session);
2843
goto done_unlocked;
2844
}
2845
2846
/* the rest require a cap */
2847
spin_lock(&inode->i_lock);
2848
cap = __get_cap_for_mds(ceph_inode(inode), mds);
2849
if (!cap) {
2850
dout(" no cap on %p ino %llx.%llx from mds%d\n",
2851
inode, ceph_ino(inode), ceph_snap(inode), mds);
2852
spin_unlock(&inode->i_lock);
2853
goto flush_cap_releases;
2854
}
2855
2856
/* note that each of these drops i_lock for us */
2857
switch (op) {
2858
case CEPH_CAP_OP_REVOKE:
2859
case CEPH_CAP_OP_GRANT:
2860
handle_cap_grant(inode, h, session, cap, msg->middle);
2861
goto done_unlocked;
2862
2863
case CEPH_CAP_OP_FLUSH_ACK:
2864
handle_cap_flush_ack(inode, tid, h, session, cap);
2865
break;
2866
2867
case CEPH_CAP_OP_TRUNC:
2868
handle_cap_trunc(inode, h, session);
2869
break;
2870
2871
default:
2872
spin_unlock(&inode->i_lock);
2873
pr_err("ceph_handle_caps: unknown cap op %d %s\n", op,
2874
ceph_cap_op_name(op));
2875
}
2876
2877
goto done;
2878
2879
flush_cap_releases:
2880
/*
2881
* send any full release message to try to move things
2882
* along for the mds (who clearly thinks we still have this
2883
* cap).
2884
*/
2885
ceph_add_cap_releases(mdsc, session);
2886
ceph_send_cap_releases(mdsc, session);
2887
2888
done:
2889
mutex_unlock(&session->s_mutex);
2890
done_unlocked:
2891
if (inode)
2892
iput(inode);
2893
if (open_target_sessions)
2894
ceph_mdsc_open_export_target_sessions(mdsc, session);
2895
return;
2896
2897
bad:
2898
pr_err("ceph_handle_caps: corrupt message\n");
2899
ceph_msg_dump(msg);
2900
return;
2901
}
2902
2903
/*
2904
* Delayed work handler to process end of delayed cap release LRU list.
2905
*/
2906
void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
2907
{
2908
struct ceph_inode_info *ci;
2909
int flags = CHECK_CAPS_NODELAY;
2910
2911
dout("check_delayed_caps\n");
2912
while (1) {
2913
spin_lock(&mdsc->cap_delay_lock);
2914
if (list_empty(&mdsc->cap_delay_list))
2915
break;
2916
ci = list_first_entry(&mdsc->cap_delay_list,
2917
struct ceph_inode_info,
2918
i_cap_delay_list);
2919
if ((ci->i_ceph_flags & CEPH_I_FLUSH) == 0 &&
2920
time_before(jiffies, ci->i_hold_caps_max))
2921
break;
2922
list_del_init(&ci->i_cap_delay_list);
2923
spin_unlock(&mdsc->cap_delay_lock);
2924
dout("check_delayed_caps on %p\n", &ci->vfs_inode);
2925
ceph_check_caps(ci, flags, NULL);
2926
}
2927
spin_unlock(&mdsc->cap_delay_lock);
2928
}
2929
2930
/*
2931
* Flush all dirty caps to the mds
2932
*/
2933
void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
2934
{
2935
struct ceph_inode_info *ci;
2936
struct inode *inode;
2937
2938
dout("flush_dirty_caps\n");
2939
spin_lock(&mdsc->cap_dirty_lock);
2940
while (!list_empty(&mdsc->cap_dirty)) {
2941
ci = list_first_entry(&mdsc->cap_dirty, struct ceph_inode_info,
2942
i_dirty_item);
2943
inode = &ci->vfs_inode;
2944
ihold(inode);
2945
dout("flush_dirty_caps %p\n", inode);
2946
spin_unlock(&mdsc->cap_dirty_lock);
2947
ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH, NULL);
2948
iput(inode);
2949
spin_lock(&mdsc->cap_dirty_lock);
2950
}
2951
spin_unlock(&mdsc->cap_dirty_lock);
2952
dout("flush_dirty_caps done\n");
2953
}
2954
2955
/*
2956
* Drop open file reference. If we were the last open file,
2957
* we may need to release capabilities to the MDS (or schedule
2958
* their delayed release).
2959
*/
2960
void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
2961
{
2962
struct inode *inode = &ci->vfs_inode;
2963
int last = 0;
2964
2965
spin_lock(&inode->i_lock);
2966
dout("put_fmode %p fmode %d %d -> %d\n", inode, fmode,
2967
ci->i_nr_by_mode[fmode], ci->i_nr_by_mode[fmode]-1);
2968
BUG_ON(ci->i_nr_by_mode[fmode] == 0);
2969
if (--ci->i_nr_by_mode[fmode] == 0)
2970
last++;
2971
spin_unlock(&inode->i_lock);
2972
2973
if (last && ci->i_vino.snap == CEPH_NOSNAP)
2974
ceph_check_caps(ci, 0, NULL);
2975
}
2976
2977
/*
2978
* Helpers for embedding cap and dentry lease releases into mds
2979
* requests.
2980
*
2981
* @force is used by dentry_release (below) to force inclusion of a
2982
* record for the directory inode, even when there aren't any caps to
2983
* drop.
2984
*/
2985
int ceph_encode_inode_release(void **p, struct inode *inode,
2986
int mds, int drop, int unless, int force)
2987
{
2988
struct ceph_inode_info *ci = ceph_inode(inode);
2989
struct ceph_cap *cap;
2990
struct ceph_mds_request_release *rel = *p;
2991
int used, dirty;
2992
int ret = 0;
2993
2994
spin_lock(&inode->i_lock);
2995
used = __ceph_caps_used(ci);
2996
dirty = __ceph_caps_dirty(ci);
2997
2998
dout("encode_inode_release %p mds%d used|dirty %s drop %s unless %s\n",
2999
inode, mds, ceph_cap_string(used|dirty), ceph_cap_string(drop),
3000
ceph_cap_string(unless));
3001
3002
/* only drop unused, clean caps */
3003
drop &= ~(used | dirty);
3004
3005
cap = __get_cap_for_mds(ci, mds);
3006
if (cap && __cap_is_valid(cap)) {
3007
if (force ||
3008
((cap->issued & drop) &&
3009
(cap->issued & unless) == 0)) {
3010
if ((cap->issued & drop) &&
3011
(cap->issued & unless) == 0) {
3012
dout("encode_inode_release %p cap %p %s -> "
3013
"%s\n", inode, cap,
3014
ceph_cap_string(cap->issued),
3015
ceph_cap_string(cap->issued & ~drop));
3016
cap->issued &= ~drop;
3017
cap->implemented &= ~drop;
3018
if (ci->i_ceph_flags & CEPH_I_NODELAY) {
3019
int wanted = __ceph_caps_wanted(ci);
3020
dout(" wanted %s -> %s (act %s)\n",
3021
ceph_cap_string(cap->mds_wanted),
3022
ceph_cap_string(cap->mds_wanted &
3023
~wanted),
3024
ceph_cap_string(wanted));
3025
cap->mds_wanted &= wanted;
3026
}
3027
} else {
3028
dout("encode_inode_release %p cap %p %s"
3029
" (force)\n", inode, cap,
3030
ceph_cap_string(cap->issued));
3031
}
3032
3033
rel->ino = cpu_to_le64(ceph_ino(inode));
3034
rel->cap_id = cpu_to_le64(cap->cap_id);
3035
rel->seq = cpu_to_le32(cap->seq);
3036
rel->issue_seq = cpu_to_le32(cap->issue_seq),
3037
rel->mseq = cpu_to_le32(cap->mseq);
3038
rel->caps = cpu_to_le32(cap->issued);
3039
rel->wanted = cpu_to_le32(cap->mds_wanted);
3040
rel->dname_len = 0;
3041
rel->dname_seq = 0;
3042
*p += sizeof(*rel);
3043
ret = 1;
3044
} else {
3045
dout("encode_inode_release %p cap %p %s\n",
3046
inode, cap, ceph_cap_string(cap->issued));
3047
}
3048
}
3049
spin_unlock(&inode->i_lock);
3050
return ret;
3051
}
3052
3053
int ceph_encode_dentry_release(void **p, struct dentry *dentry,
3054
int mds, int drop, int unless)
3055
{
3056
struct inode *dir = dentry->d_parent->d_inode;
3057
struct ceph_mds_request_release *rel = *p;
3058
struct ceph_dentry_info *di = ceph_dentry(dentry);
3059
int force = 0;
3060
int ret;
3061
3062
/*
3063
* force an record for the directory caps if we have a dentry lease.
3064
* this is racy (can't take i_lock and d_lock together), but it
3065
* doesn't have to be perfect; the mds will revoke anything we don't
3066
* release.
3067
*/
3068
spin_lock(&dentry->d_lock);
3069
if (di->lease_session && di->lease_session->s_mds == mds)
3070
force = 1;
3071
spin_unlock(&dentry->d_lock);
3072
3073
ret = ceph_encode_inode_release(p, dir, mds, drop, unless, force);
3074
3075
spin_lock(&dentry->d_lock);
3076
if (ret && di->lease_session && di->lease_session->s_mds == mds) {
3077
dout("encode_dentry_release %p mds%d seq %d\n",
3078
dentry, mds, (int)di->lease_seq);
3079
rel->dname_len = cpu_to_le32(dentry->d_name.len);
3080
memcpy(*p, dentry->d_name.name, dentry->d_name.len);
3081
*p += dentry->d_name.len;
3082
rel->dname_seq = cpu_to_le32(di->lease_seq);
3083
__ceph_mdsc_drop_dentry_lease(dentry);
3084
}
3085
spin_unlock(&dentry->d_lock);
3086
return ret;
3087
}
3088
3089