Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
awilliam
GitHub Repository: awilliam/linux-vfio
Path: blob/master/drivers/block/rbd.c
15109 views
1
/*
2
rbd.c -- Export ceph rados objects as a Linux block device
3
4
5
based on drivers/block/osdblk.c:
6
7
Copyright 2009 Red Hat, Inc.
8
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation.
12
13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
GNU General Public License for more details.
17
18
You should have received a copy of the GNU General Public License
19
along with this program; see the file COPYING. If not, write to
20
the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24
For usage instructions, please refer to:
25
26
Documentation/ABI/testing/sysfs-bus-rbd
27
28
*/
29
30
#include <linux/ceph/libceph.h>
31
#include <linux/ceph/osd_client.h>
32
#include <linux/ceph/mon_client.h>
33
#include <linux/ceph/decode.h>
34
#include <linux/parser.h>
35
36
#include <linux/kernel.h>
37
#include <linux/device.h>
38
#include <linux/module.h>
39
#include <linux/fs.h>
40
#include <linux/blkdev.h>
41
42
#include "rbd_types.h"
43
44
#define DRV_NAME "rbd"
45
#define DRV_NAME_LONG "rbd (rados block device)"
46
47
#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
48
49
#define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX))
50
#define RBD_MAX_POOL_NAME_LEN 64
51
#define RBD_MAX_SNAP_NAME_LEN 32
52
#define RBD_MAX_OPT_LEN 1024
53
54
#define RBD_SNAP_HEAD_NAME "-"
55
56
#define DEV_NAME_LEN 32
57
58
#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59
60
/*
61
* block device image metadata (in-memory version)
62
*/
63
struct rbd_image_header {
64
u64 image_size;
65
char block_name[32];
66
__u8 obj_order;
67
__u8 crypt_type;
68
__u8 comp_type;
69
struct rw_semaphore snap_rwsem;
70
struct ceph_snap_context *snapc;
71
size_t snap_names_len;
72
u64 snap_seq;
73
u32 total_snaps;
74
75
char *snap_names;
76
u64 *snap_sizes;
77
78
u64 obj_version;
79
};
80
81
struct rbd_options {
82
int notify_timeout;
83
};
84
85
/*
86
* an instance of the client. multiple devices may share a client.
87
*/
88
struct rbd_client {
89
struct ceph_client *client;
90
struct rbd_options *rbd_opts;
91
struct kref kref;
92
struct list_head node;
93
};
94
95
struct rbd_req_coll;
96
97
/*
98
* a single io request
99
*/
100
struct rbd_request {
101
struct request *rq; /* blk layer request */
102
struct bio *bio; /* cloned bio */
103
struct page **pages; /* list of used pages */
104
u64 len;
105
int coll_index;
106
struct rbd_req_coll *coll;
107
};
108
109
struct rbd_req_status {
110
int done;
111
int rc;
112
u64 bytes;
113
};
114
115
/*
116
* a collection of requests
117
*/
118
struct rbd_req_coll {
119
int total;
120
int num_done;
121
struct kref kref;
122
struct rbd_req_status status[0];
123
};
124
125
struct rbd_snap {
126
struct device dev;
127
const char *name;
128
size_t size;
129
struct list_head node;
130
u64 id;
131
};
132
133
/*
134
* a single device
135
*/
136
struct rbd_device {
137
int id; /* blkdev unique id */
138
139
int major; /* blkdev assigned major */
140
struct gendisk *disk; /* blkdev's gendisk and rq */
141
struct request_queue *q;
142
143
struct ceph_client *client;
144
struct rbd_client *rbd_client;
145
146
char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
147
148
spinlock_t lock; /* queue lock */
149
150
struct rbd_image_header header;
151
char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
152
int obj_len;
153
char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
154
char pool_name[RBD_MAX_POOL_NAME_LEN];
155
int poolid;
156
157
struct ceph_osd_event *watch_event;
158
struct ceph_osd_request *watch_request;
159
160
char snap_name[RBD_MAX_SNAP_NAME_LEN];
161
u32 cur_snap; /* index+1 of current snapshot within snap context
162
0 - for the head */
163
int read_only;
164
165
struct list_head node;
166
167
/* list of snapshots */
168
struct list_head snaps;
169
170
/* sysfs related */
171
struct device dev;
172
};
173
174
static struct bus_type rbd_bus_type = {
175
.name = "rbd",
176
};
177
178
static spinlock_t node_lock; /* protects client get/put */
179
180
static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
181
static LIST_HEAD(rbd_dev_list); /* devices */
182
static LIST_HEAD(rbd_client_list); /* clients */
183
184
static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
185
static void rbd_dev_release(struct device *dev);
186
static ssize_t rbd_snap_rollback(struct device *dev,
187
struct device_attribute *attr,
188
const char *buf,
189
size_t size);
190
static ssize_t rbd_snap_add(struct device *dev,
191
struct device_attribute *attr,
192
const char *buf,
193
size_t count);
194
static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
195
struct rbd_snap *snap);;
196
197
198
static struct rbd_device *dev_to_rbd(struct device *dev)
199
{
200
return container_of(dev, struct rbd_device, dev);
201
}
202
203
static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
204
{
205
return get_device(&rbd_dev->dev);
206
}
207
208
static void rbd_put_dev(struct rbd_device *rbd_dev)
209
{
210
put_device(&rbd_dev->dev);
211
}
212
213
static int __rbd_update_snaps(struct rbd_device *rbd_dev);
214
215
static int rbd_open(struct block_device *bdev, fmode_t mode)
216
{
217
struct gendisk *disk = bdev->bd_disk;
218
struct rbd_device *rbd_dev = disk->private_data;
219
220
rbd_get_dev(rbd_dev);
221
222
set_device_ro(bdev, rbd_dev->read_only);
223
224
if ((mode & FMODE_WRITE) && rbd_dev->read_only)
225
return -EROFS;
226
227
return 0;
228
}
229
230
static int rbd_release(struct gendisk *disk, fmode_t mode)
231
{
232
struct rbd_device *rbd_dev = disk->private_data;
233
234
rbd_put_dev(rbd_dev);
235
236
return 0;
237
}
238
239
static const struct block_device_operations rbd_bd_ops = {
240
.owner = THIS_MODULE,
241
.open = rbd_open,
242
.release = rbd_release,
243
};
244
245
/*
246
* Initialize an rbd client instance.
247
* We own *opt.
248
*/
249
static struct rbd_client *rbd_client_create(struct ceph_options *opt,
250
struct rbd_options *rbd_opts)
251
{
252
struct rbd_client *rbdc;
253
int ret = -ENOMEM;
254
255
dout("rbd_client_create\n");
256
rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
257
if (!rbdc)
258
goto out_opt;
259
260
kref_init(&rbdc->kref);
261
INIT_LIST_HEAD(&rbdc->node);
262
263
rbdc->client = ceph_create_client(opt, rbdc);
264
if (IS_ERR(rbdc->client))
265
goto out_rbdc;
266
opt = NULL; /* Now rbdc->client is responsible for opt */
267
268
ret = ceph_open_session(rbdc->client);
269
if (ret < 0)
270
goto out_err;
271
272
rbdc->rbd_opts = rbd_opts;
273
274
spin_lock(&node_lock);
275
list_add_tail(&rbdc->node, &rbd_client_list);
276
spin_unlock(&node_lock);
277
278
dout("rbd_client_create created %p\n", rbdc);
279
return rbdc;
280
281
out_err:
282
ceph_destroy_client(rbdc->client);
283
out_rbdc:
284
kfree(rbdc);
285
out_opt:
286
if (opt)
287
ceph_destroy_options(opt);
288
return ERR_PTR(ret);
289
}
290
291
/*
292
* Find a ceph client with specific addr and configuration.
293
*/
294
static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
295
{
296
struct rbd_client *client_node;
297
298
if (opt->flags & CEPH_OPT_NOSHARE)
299
return NULL;
300
301
list_for_each_entry(client_node, &rbd_client_list, node)
302
if (ceph_compare_options(opt, client_node->client) == 0)
303
return client_node;
304
return NULL;
305
}
306
307
/*
308
* mount options
309
*/
310
enum {
311
Opt_notify_timeout,
312
Opt_last_int,
313
/* int args above */
314
Opt_last_string,
315
/* string args above */
316
};
317
318
static match_table_t rbdopt_tokens = {
319
{Opt_notify_timeout, "notify_timeout=%d"},
320
/* int args above */
321
/* string args above */
322
{-1, NULL}
323
};
324
325
static int parse_rbd_opts_token(char *c, void *private)
326
{
327
struct rbd_options *rbdopt = private;
328
substring_t argstr[MAX_OPT_ARGS];
329
int token, intval, ret;
330
331
token = match_token((char *)c, rbdopt_tokens, argstr);
332
if (token < 0)
333
return -EINVAL;
334
335
if (token < Opt_last_int) {
336
ret = match_int(&argstr[0], &intval);
337
if (ret < 0) {
338
pr_err("bad mount option arg (not int) "
339
"at '%s'\n", c);
340
return ret;
341
}
342
dout("got int token %d val %d\n", token, intval);
343
} else if (token > Opt_last_int && token < Opt_last_string) {
344
dout("got string token %d val %s\n", token,
345
argstr[0].from);
346
} else {
347
dout("got token %d\n", token);
348
}
349
350
switch (token) {
351
case Opt_notify_timeout:
352
rbdopt->notify_timeout = intval;
353
break;
354
default:
355
BUG_ON(token);
356
}
357
return 0;
358
}
359
360
/*
361
* Get a ceph client with specific addr and configuration, if one does
362
* not exist create it.
363
*/
364
static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
365
char *options)
366
{
367
struct rbd_client *rbdc;
368
struct ceph_options *opt;
369
int ret;
370
struct rbd_options *rbd_opts;
371
372
rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
373
if (!rbd_opts)
374
return -ENOMEM;
375
376
rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
377
378
ret = ceph_parse_options(&opt, options, mon_addr,
379
mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
380
if (ret < 0)
381
goto done_err;
382
383
spin_lock(&node_lock);
384
rbdc = __rbd_client_find(opt);
385
if (rbdc) {
386
ceph_destroy_options(opt);
387
388
/* using an existing client */
389
kref_get(&rbdc->kref);
390
rbd_dev->rbd_client = rbdc;
391
rbd_dev->client = rbdc->client;
392
spin_unlock(&node_lock);
393
return 0;
394
}
395
spin_unlock(&node_lock);
396
397
rbdc = rbd_client_create(opt, rbd_opts);
398
if (IS_ERR(rbdc)) {
399
ret = PTR_ERR(rbdc);
400
goto done_err;
401
}
402
403
rbd_dev->rbd_client = rbdc;
404
rbd_dev->client = rbdc->client;
405
return 0;
406
done_err:
407
kfree(rbd_opts);
408
return ret;
409
}
410
411
/*
412
* Destroy ceph client
413
*/
414
static void rbd_client_release(struct kref *kref)
415
{
416
struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
417
418
dout("rbd_release_client %p\n", rbdc);
419
spin_lock(&node_lock);
420
list_del(&rbdc->node);
421
spin_unlock(&node_lock);
422
423
ceph_destroy_client(rbdc->client);
424
kfree(rbdc->rbd_opts);
425
kfree(rbdc);
426
}
427
428
/*
429
* Drop reference to ceph client node. If it's not referenced anymore, release
430
* it.
431
*/
432
static void rbd_put_client(struct rbd_device *rbd_dev)
433
{
434
kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
435
rbd_dev->rbd_client = NULL;
436
rbd_dev->client = NULL;
437
}
438
439
/*
440
* Destroy requests collection
441
*/
442
static void rbd_coll_release(struct kref *kref)
443
{
444
struct rbd_req_coll *coll =
445
container_of(kref, struct rbd_req_coll, kref);
446
447
dout("rbd_coll_release %p\n", coll);
448
kfree(coll);
449
}
450
451
/*
452
* Create a new header structure, translate header format from the on-disk
453
* header.
454
*/
455
static int rbd_header_from_disk(struct rbd_image_header *header,
456
struct rbd_image_header_ondisk *ondisk,
457
int allocated_snaps,
458
gfp_t gfp_flags)
459
{
460
int i;
461
u32 snap_count = le32_to_cpu(ondisk->snap_count);
462
int ret = -ENOMEM;
463
464
init_rwsem(&header->snap_rwsem);
465
header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
466
header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
467
snap_count *
468
sizeof(struct rbd_image_snap_ondisk),
469
gfp_flags);
470
if (!header->snapc)
471
return -ENOMEM;
472
if (snap_count) {
473
header->snap_names = kmalloc(header->snap_names_len,
474
GFP_KERNEL);
475
if (!header->snap_names)
476
goto err_snapc;
477
header->snap_sizes = kmalloc(snap_count * sizeof(u64),
478
GFP_KERNEL);
479
if (!header->snap_sizes)
480
goto err_names;
481
} else {
482
header->snap_names = NULL;
483
header->snap_sizes = NULL;
484
}
485
memcpy(header->block_name, ondisk->block_name,
486
sizeof(ondisk->block_name));
487
488
header->image_size = le64_to_cpu(ondisk->image_size);
489
header->obj_order = ondisk->options.order;
490
header->crypt_type = ondisk->options.crypt_type;
491
header->comp_type = ondisk->options.comp_type;
492
493
atomic_set(&header->snapc->nref, 1);
494
header->snap_seq = le64_to_cpu(ondisk->snap_seq);
495
header->snapc->num_snaps = snap_count;
496
header->total_snaps = snap_count;
497
498
if (snap_count &&
499
allocated_snaps == snap_count) {
500
for (i = 0; i < snap_count; i++) {
501
header->snapc->snaps[i] =
502
le64_to_cpu(ondisk->snaps[i].id);
503
header->snap_sizes[i] =
504
le64_to_cpu(ondisk->snaps[i].image_size);
505
}
506
507
/* copy snapshot names */
508
memcpy(header->snap_names, &ondisk->snaps[i],
509
header->snap_names_len);
510
}
511
512
return 0;
513
514
err_names:
515
kfree(header->snap_names);
516
err_snapc:
517
kfree(header->snapc);
518
return ret;
519
}
520
521
static int snap_index(struct rbd_image_header *header, int snap_num)
522
{
523
return header->total_snaps - snap_num;
524
}
525
526
static u64 cur_snap_id(struct rbd_device *rbd_dev)
527
{
528
struct rbd_image_header *header = &rbd_dev->header;
529
530
if (!rbd_dev->cur_snap)
531
return 0;
532
533
return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
534
}
535
536
static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
537
u64 *seq, u64 *size)
538
{
539
int i;
540
char *p = header->snap_names;
541
542
for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
543
if (strcmp(snap_name, p) == 0)
544
break;
545
}
546
if (i == header->total_snaps)
547
return -ENOENT;
548
if (seq)
549
*seq = header->snapc->snaps[i];
550
551
if (size)
552
*size = header->snap_sizes[i];
553
554
return i;
555
}
556
557
static int rbd_header_set_snap(struct rbd_device *dev,
558
const char *snap_name,
559
u64 *size)
560
{
561
struct rbd_image_header *header = &dev->header;
562
struct ceph_snap_context *snapc = header->snapc;
563
int ret = -ENOENT;
564
565
down_write(&header->snap_rwsem);
566
567
if (!snap_name ||
568
!*snap_name ||
569
strcmp(snap_name, "-") == 0 ||
570
strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
571
if (header->total_snaps)
572
snapc->seq = header->snap_seq;
573
else
574
snapc->seq = 0;
575
dev->cur_snap = 0;
576
dev->read_only = 0;
577
if (size)
578
*size = header->image_size;
579
} else {
580
ret = snap_by_name(header, snap_name, &snapc->seq, size);
581
if (ret < 0)
582
goto done;
583
584
dev->cur_snap = header->total_snaps - ret;
585
dev->read_only = 1;
586
}
587
588
ret = 0;
589
done:
590
up_write(&header->snap_rwsem);
591
return ret;
592
}
593
594
static void rbd_header_free(struct rbd_image_header *header)
595
{
596
kfree(header->snapc);
597
kfree(header->snap_names);
598
kfree(header->snap_sizes);
599
}
600
601
/*
602
* get the actual striped segment name, offset and length
603
*/
604
static u64 rbd_get_segment(struct rbd_image_header *header,
605
const char *block_name,
606
u64 ofs, u64 len,
607
char *seg_name, u64 *segofs)
608
{
609
u64 seg = ofs >> header->obj_order;
610
611
if (seg_name)
612
snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
613
"%s.%012llx", block_name, seg);
614
615
ofs = ofs & ((1 << header->obj_order) - 1);
616
len = min_t(u64, len, (1 << header->obj_order) - ofs);
617
618
if (segofs)
619
*segofs = ofs;
620
621
return len;
622
}
623
624
static int rbd_get_num_segments(struct rbd_image_header *header,
625
u64 ofs, u64 len)
626
{
627
u64 start_seg = ofs >> header->obj_order;
628
u64 end_seg = (ofs + len - 1) >> header->obj_order;
629
return end_seg - start_seg + 1;
630
}
631
632
/*
633
* bio helpers
634
*/
635
636
static void bio_chain_put(struct bio *chain)
637
{
638
struct bio *tmp;
639
640
while (chain) {
641
tmp = chain;
642
chain = chain->bi_next;
643
bio_put(tmp);
644
}
645
}
646
647
/*
648
* zeros a bio chain, starting at specific offset
649
*/
650
static void zero_bio_chain(struct bio *chain, int start_ofs)
651
{
652
struct bio_vec *bv;
653
unsigned long flags;
654
void *buf;
655
int i;
656
int pos = 0;
657
658
while (chain) {
659
bio_for_each_segment(bv, chain, i) {
660
if (pos + bv->bv_len > start_ofs) {
661
int remainder = max(start_ofs - pos, 0);
662
buf = bvec_kmap_irq(bv, &flags);
663
memset(buf + remainder, 0,
664
bv->bv_len - remainder);
665
bvec_kunmap_irq(buf, &flags);
666
}
667
pos += bv->bv_len;
668
}
669
670
chain = chain->bi_next;
671
}
672
}
673
674
/*
675
* bio_chain_clone - clone a chain of bios up to a certain length.
676
* might return a bio_pair that will need to be released.
677
*/
678
static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
679
struct bio_pair **bp,
680
int len, gfp_t gfpmask)
681
{
682
struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
683
int total = 0;
684
685
if (*bp) {
686
bio_pair_release(*bp);
687
*bp = NULL;
688
}
689
690
while (old_chain && (total < len)) {
691
tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
692
if (!tmp)
693
goto err_out;
694
695
if (total + old_chain->bi_size > len) {
696
struct bio_pair *bp;
697
698
/*
699
* this split can only happen with a single paged bio,
700
* split_bio will BUG_ON if this is not the case
701
*/
702
dout("bio_chain_clone split! total=%d remaining=%d"
703
"bi_size=%d\n",
704
(int)total, (int)len-total,
705
(int)old_chain->bi_size);
706
707
/* split the bio. We'll release it either in the next
708
call, or it will have to be released outside */
709
bp = bio_split(old_chain, (len - total) / 512ULL);
710
if (!bp)
711
goto err_out;
712
713
__bio_clone(tmp, &bp->bio1);
714
715
*next = &bp->bio2;
716
} else {
717
__bio_clone(tmp, old_chain);
718
*next = old_chain->bi_next;
719
}
720
721
tmp->bi_bdev = NULL;
722
gfpmask &= ~__GFP_WAIT;
723
tmp->bi_next = NULL;
724
725
if (!new_chain) {
726
new_chain = tail = tmp;
727
} else {
728
tail->bi_next = tmp;
729
tail = tmp;
730
}
731
old_chain = old_chain->bi_next;
732
733
total += tmp->bi_size;
734
}
735
736
BUG_ON(total < len);
737
738
if (tail)
739
tail->bi_next = NULL;
740
741
*old = old_chain;
742
743
return new_chain;
744
745
err_out:
746
dout("bio_chain_clone with err\n");
747
bio_chain_put(new_chain);
748
return NULL;
749
}
750
751
/*
752
* helpers for osd request op vectors.
753
*/
754
static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
755
int num_ops,
756
int opcode,
757
u32 payload_len)
758
{
759
*ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
760
GFP_NOIO);
761
if (!*ops)
762
return -ENOMEM;
763
(*ops)[0].op = opcode;
764
/*
765
* op extent offset and length will be set later on
766
* in calc_raw_layout()
767
*/
768
(*ops)[0].payload_len = payload_len;
769
return 0;
770
}
771
772
static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
773
{
774
kfree(ops);
775
}
776
777
static void rbd_coll_end_req_index(struct request *rq,
778
struct rbd_req_coll *coll,
779
int index,
780
int ret, u64 len)
781
{
782
struct request_queue *q;
783
int min, max, i;
784
785
dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
786
coll, index, ret, len);
787
788
if (!rq)
789
return;
790
791
if (!coll) {
792
blk_end_request(rq, ret, len);
793
return;
794
}
795
796
q = rq->q;
797
798
spin_lock_irq(q->queue_lock);
799
coll->status[index].done = 1;
800
coll->status[index].rc = ret;
801
coll->status[index].bytes = len;
802
max = min = coll->num_done;
803
while (max < coll->total && coll->status[max].done)
804
max++;
805
806
for (i = min; i<max; i++) {
807
__blk_end_request(rq, coll->status[i].rc,
808
coll->status[i].bytes);
809
coll->num_done++;
810
kref_put(&coll->kref, rbd_coll_release);
811
}
812
spin_unlock_irq(q->queue_lock);
813
}
814
815
static void rbd_coll_end_req(struct rbd_request *req,
816
int ret, u64 len)
817
{
818
rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
819
}
820
821
/*
822
* Send ceph osd request
823
*/
824
static int rbd_do_request(struct request *rq,
825
struct rbd_device *dev,
826
struct ceph_snap_context *snapc,
827
u64 snapid,
828
const char *obj, u64 ofs, u64 len,
829
struct bio *bio,
830
struct page **pages,
831
int num_pages,
832
int flags,
833
struct ceph_osd_req_op *ops,
834
int num_reply,
835
struct rbd_req_coll *coll,
836
int coll_index,
837
void (*rbd_cb)(struct ceph_osd_request *req,
838
struct ceph_msg *msg),
839
struct ceph_osd_request **linger_req,
840
u64 *ver)
841
{
842
struct ceph_osd_request *req;
843
struct ceph_file_layout *layout;
844
int ret;
845
u64 bno;
846
struct timespec mtime = CURRENT_TIME;
847
struct rbd_request *req_data;
848
struct ceph_osd_request_head *reqhead;
849
struct rbd_image_header *header = &dev->header;
850
851
req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
852
if (!req_data) {
853
if (coll)
854
rbd_coll_end_req_index(rq, coll, coll_index,
855
-ENOMEM, len);
856
return -ENOMEM;
857
}
858
859
if (coll) {
860
req_data->coll = coll;
861
req_data->coll_index = coll_index;
862
}
863
864
dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
865
866
down_read(&header->snap_rwsem);
867
868
req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
869
snapc,
870
ops,
871
false,
872
GFP_NOIO, pages, bio);
873
if (!req) {
874
up_read(&header->snap_rwsem);
875
ret = -ENOMEM;
876
goto done_pages;
877
}
878
879
req->r_callback = rbd_cb;
880
881
req_data->rq = rq;
882
req_data->bio = bio;
883
req_data->pages = pages;
884
req_data->len = len;
885
886
req->r_priv = req_data;
887
888
reqhead = req->r_request->front.iov_base;
889
reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
890
891
strncpy(req->r_oid, obj, sizeof(req->r_oid));
892
req->r_oid_len = strlen(req->r_oid);
893
894
layout = &req->r_file_layout;
895
memset(layout, 0, sizeof(*layout));
896
layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
897
layout->fl_stripe_count = cpu_to_le32(1);
898
layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
899
layout->fl_pg_preferred = cpu_to_le32(-1);
900
layout->fl_pg_pool = cpu_to_le32(dev->poolid);
901
ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
902
ofs, &len, &bno, req, ops);
903
904
ceph_osdc_build_request(req, ofs, &len,
905
ops,
906
snapc,
907
&mtime,
908
req->r_oid, req->r_oid_len);
909
up_read(&header->snap_rwsem);
910
911
if (linger_req) {
912
ceph_osdc_set_request_linger(&dev->client->osdc, req);
913
*linger_req = req;
914
}
915
916
ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
917
if (ret < 0)
918
goto done_err;
919
920
if (!rbd_cb) {
921
ret = ceph_osdc_wait_request(&dev->client->osdc, req);
922
if (ver)
923
*ver = le64_to_cpu(req->r_reassert_version.version);
924
dout("reassert_ver=%lld\n",
925
le64_to_cpu(req->r_reassert_version.version));
926
ceph_osdc_put_request(req);
927
}
928
return ret;
929
930
done_err:
931
bio_chain_put(req_data->bio);
932
ceph_osdc_put_request(req);
933
done_pages:
934
rbd_coll_end_req(req_data, ret, len);
935
kfree(req_data);
936
return ret;
937
}
938
939
/*
940
* Ceph osd op callback
941
*/
942
static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
943
{
944
struct rbd_request *req_data = req->r_priv;
945
struct ceph_osd_reply_head *replyhead;
946
struct ceph_osd_op *op;
947
__s32 rc;
948
u64 bytes;
949
int read_op;
950
951
/* parse reply */
952
replyhead = msg->front.iov_base;
953
WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
954
op = (void *)(replyhead + 1);
955
rc = le32_to_cpu(replyhead->result);
956
bytes = le64_to_cpu(op->extent.length);
957
read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
958
959
dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
960
961
if (rc == -ENOENT && read_op) {
962
zero_bio_chain(req_data->bio, 0);
963
rc = 0;
964
} else if (rc == 0 && read_op && bytes < req_data->len) {
965
zero_bio_chain(req_data->bio, bytes);
966
bytes = req_data->len;
967
}
968
969
rbd_coll_end_req(req_data, rc, bytes);
970
971
if (req_data->bio)
972
bio_chain_put(req_data->bio);
973
974
ceph_osdc_put_request(req);
975
kfree(req_data);
976
}
977
978
static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
979
{
980
ceph_osdc_put_request(req);
981
}
982
983
/*
984
* Do a synchronous ceph osd operation
985
*/
986
static int rbd_req_sync_op(struct rbd_device *dev,
987
struct ceph_snap_context *snapc,
988
u64 snapid,
989
int opcode,
990
int flags,
991
struct ceph_osd_req_op *orig_ops,
992
int num_reply,
993
const char *obj,
994
u64 ofs, u64 len,
995
char *buf,
996
struct ceph_osd_request **linger_req,
997
u64 *ver)
998
{
999
int ret;
1000
struct page **pages;
1001
int num_pages;
1002
struct ceph_osd_req_op *ops = orig_ops;
1003
u32 payload_len;
1004
1005
num_pages = calc_pages_for(ofs , len);
1006
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1007
if (IS_ERR(pages))
1008
return PTR_ERR(pages);
1009
1010
if (!orig_ops) {
1011
payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1012
ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1013
if (ret < 0)
1014
goto done;
1015
1016
if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1017
ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1018
if (ret < 0)
1019
goto done_ops;
1020
}
1021
}
1022
1023
ret = rbd_do_request(NULL, dev, snapc, snapid,
1024
obj, ofs, len, NULL,
1025
pages, num_pages,
1026
flags,
1027
ops,
1028
2,
1029
NULL, 0,
1030
NULL,
1031
linger_req, ver);
1032
if (ret < 0)
1033
goto done_ops;
1034
1035
if ((flags & CEPH_OSD_FLAG_READ) && buf)
1036
ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1037
1038
done_ops:
1039
if (!orig_ops)
1040
rbd_destroy_ops(ops);
1041
done:
1042
ceph_release_page_vector(pages, num_pages);
1043
return ret;
1044
}
1045
1046
/*
1047
* Do an asynchronous ceph osd operation
1048
*/
1049
static int rbd_do_op(struct request *rq,
1050
struct rbd_device *rbd_dev ,
1051
struct ceph_snap_context *snapc,
1052
u64 snapid,
1053
int opcode, int flags, int num_reply,
1054
u64 ofs, u64 len,
1055
struct bio *bio,
1056
struct rbd_req_coll *coll,
1057
int coll_index)
1058
{
1059
char *seg_name;
1060
u64 seg_ofs;
1061
u64 seg_len;
1062
int ret;
1063
struct ceph_osd_req_op *ops;
1064
u32 payload_len;
1065
1066
seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1067
if (!seg_name)
1068
return -ENOMEM;
1069
1070
seg_len = rbd_get_segment(&rbd_dev->header,
1071
rbd_dev->header.block_name,
1072
ofs, len,
1073
seg_name, &seg_ofs);
1074
1075
payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1076
1077
ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1078
if (ret < 0)
1079
goto done;
1080
1081
/* we've taken care of segment sizes earlier when we
1082
cloned the bios. We should never have a segment
1083
truncated at this point */
1084
BUG_ON(seg_len < len);
1085
1086
ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1087
seg_name, seg_ofs, seg_len,
1088
bio,
1089
NULL, 0,
1090
flags,
1091
ops,
1092
num_reply,
1093
coll, coll_index,
1094
rbd_req_cb, 0, NULL);
1095
1096
rbd_destroy_ops(ops);
1097
done:
1098
kfree(seg_name);
1099
return ret;
1100
}
1101
1102
/*
1103
* Request async osd write
1104
*/
1105
static int rbd_req_write(struct request *rq,
1106
struct rbd_device *rbd_dev,
1107
struct ceph_snap_context *snapc,
1108
u64 ofs, u64 len,
1109
struct bio *bio,
1110
struct rbd_req_coll *coll,
1111
int coll_index)
1112
{
1113
return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1114
CEPH_OSD_OP_WRITE,
1115
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1116
2,
1117
ofs, len, bio, coll, coll_index);
1118
}
1119
1120
/*
1121
* Request async osd read
1122
*/
1123
static int rbd_req_read(struct request *rq,
1124
struct rbd_device *rbd_dev,
1125
u64 snapid,
1126
u64 ofs, u64 len,
1127
struct bio *bio,
1128
struct rbd_req_coll *coll,
1129
int coll_index)
1130
{
1131
return rbd_do_op(rq, rbd_dev, NULL,
1132
(snapid ? snapid : CEPH_NOSNAP),
1133
CEPH_OSD_OP_READ,
1134
CEPH_OSD_FLAG_READ,
1135
2,
1136
ofs, len, bio, coll, coll_index);
1137
}
1138
1139
/*
1140
* Request sync osd read
1141
*/
1142
static int rbd_req_sync_read(struct rbd_device *dev,
1143
struct ceph_snap_context *snapc,
1144
u64 snapid,
1145
const char *obj,
1146
u64 ofs, u64 len,
1147
char *buf,
1148
u64 *ver)
1149
{
1150
return rbd_req_sync_op(dev, NULL,
1151
(snapid ? snapid : CEPH_NOSNAP),
1152
CEPH_OSD_OP_READ,
1153
CEPH_OSD_FLAG_READ,
1154
NULL,
1155
1, obj, ofs, len, buf, NULL, ver);
1156
}
1157
1158
/*
1159
* Request sync osd watch
1160
*/
1161
static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1162
u64 ver,
1163
u64 notify_id,
1164
const char *obj)
1165
{
1166
struct ceph_osd_req_op *ops;
1167
struct page **pages = NULL;
1168
int ret;
1169
1170
ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1171
if (ret < 0)
1172
return ret;
1173
1174
ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1175
ops[0].watch.cookie = notify_id;
1176
ops[0].watch.flag = 0;
1177
1178
ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1179
obj, 0, 0, NULL,
1180
pages, 0,
1181
CEPH_OSD_FLAG_READ,
1182
ops,
1183
1,
1184
NULL, 0,
1185
rbd_simple_req_cb, 0, NULL);
1186
1187
rbd_destroy_ops(ops);
1188
return ret;
1189
}
1190
1191
static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1192
{
1193
struct rbd_device *dev = (struct rbd_device *)data;
1194
int rc;
1195
1196
if (!dev)
1197
return;
1198
1199
dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1200
notify_id, (int)opcode);
1201
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1202
rc = __rbd_update_snaps(dev);
1203
mutex_unlock(&ctl_mutex);
1204
if (rc)
1205
pr_warning(DRV_NAME "%d got notification but failed to update"
1206
" snaps: %d\n", dev->major, rc);
1207
1208
rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1209
}
1210
1211
/*
1212
* Request sync osd watch
1213
*/
1214
static int rbd_req_sync_watch(struct rbd_device *dev,
1215
const char *obj,
1216
u64 ver)
1217
{
1218
struct ceph_osd_req_op *ops;
1219
struct ceph_osd_client *osdc = &dev->client->osdc;
1220
1221
int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1222
if (ret < 0)
1223
return ret;
1224
1225
ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1226
(void *)dev, &dev->watch_event);
1227
if (ret < 0)
1228
goto fail;
1229
1230
ops[0].watch.ver = cpu_to_le64(ver);
1231
ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1232
ops[0].watch.flag = 1;
1233
1234
ret = rbd_req_sync_op(dev, NULL,
1235
CEPH_NOSNAP,
1236
0,
1237
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1238
ops,
1239
1, obj, 0, 0, NULL,
1240
&dev->watch_request, NULL);
1241
1242
if (ret < 0)
1243
goto fail_event;
1244
1245
rbd_destroy_ops(ops);
1246
return 0;
1247
1248
fail_event:
1249
ceph_osdc_cancel_event(dev->watch_event);
1250
dev->watch_event = NULL;
1251
fail:
1252
rbd_destroy_ops(ops);
1253
return ret;
1254
}
1255
1256
struct rbd_notify_info {
1257
struct rbd_device *dev;
1258
};
1259
1260
static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1261
{
1262
struct rbd_device *dev = (struct rbd_device *)data;
1263
if (!dev)
1264
return;
1265
1266
dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1267
notify_id, (int)opcode);
1268
}
1269
1270
/*
1271
* Request sync osd notify
1272
*/
1273
static int rbd_req_sync_notify(struct rbd_device *dev,
1274
const char *obj)
1275
{
1276
struct ceph_osd_req_op *ops;
1277
struct ceph_osd_client *osdc = &dev->client->osdc;
1278
struct ceph_osd_event *event;
1279
struct rbd_notify_info info;
1280
int payload_len = sizeof(u32) + sizeof(u32);
1281
int ret;
1282
1283
ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1284
if (ret < 0)
1285
return ret;
1286
1287
info.dev = dev;
1288
1289
ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1290
(void *)&info, &event);
1291
if (ret < 0)
1292
goto fail;
1293
1294
ops[0].watch.ver = 1;
1295
ops[0].watch.flag = 1;
1296
ops[0].watch.cookie = event->cookie;
1297
ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1298
ops[0].watch.timeout = 12;
1299
1300
ret = rbd_req_sync_op(dev, NULL,
1301
CEPH_NOSNAP,
1302
0,
1303
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1304
ops,
1305
1, obj, 0, 0, NULL, NULL, NULL);
1306
if (ret < 0)
1307
goto fail_event;
1308
1309
ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1310
dout("ceph_osdc_wait_event returned %d\n", ret);
1311
rbd_destroy_ops(ops);
1312
return 0;
1313
1314
fail_event:
1315
ceph_osdc_cancel_event(event);
1316
fail:
1317
rbd_destroy_ops(ops);
1318
return ret;
1319
}
1320
1321
/*
1322
* Request sync osd rollback
1323
*/
1324
static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
1325
u64 snapid,
1326
const char *obj)
1327
{
1328
struct ceph_osd_req_op *ops;
1329
int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
1330
if (ret < 0)
1331
return ret;
1332
1333
ops[0].snap.snapid = snapid;
1334
1335
ret = rbd_req_sync_op(dev, NULL,
1336
CEPH_NOSNAP,
1337
0,
1338
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1339
ops,
1340
1, obj, 0, 0, NULL, NULL, NULL);
1341
1342
rbd_destroy_ops(ops);
1343
1344
return ret;
1345
}
1346
1347
/*
1348
* Request sync osd read
1349
*/
1350
static int rbd_req_sync_exec(struct rbd_device *dev,
1351
const char *obj,
1352
const char *cls,
1353
const char *method,
1354
const char *data,
1355
int len,
1356
u64 *ver)
1357
{
1358
struct ceph_osd_req_op *ops;
1359
int cls_len = strlen(cls);
1360
int method_len = strlen(method);
1361
int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1362
cls_len + method_len + len);
1363
if (ret < 0)
1364
return ret;
1365
1366
ops[0].cls.class_name = cls;
1367
ops[0].cls.class_len = (__u8)cls_len;
1368
ops[0].cls.method_name = method;
1369
ops[0].cls.method_len = (__u8)method_len;
1370
ops[0].cls.argc = 0;
1371
ops[0].cls.indata = data;
1372
ops[0].cls.indata_len = len;
1373
1374
ret = rbd_req_sync_op(dev, NULL,
1375
CEPH_NOSNAP,
1376
0,
1377
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1378
ops,
1379
1, obj, 0, 0, NULL, NULL, ver);
1380
1381
rbd_destroy_ops(ops);
1382
1383
dout("cls_exec returned %d\n", ret);
1384
return ret;
1385
}
1386
1387
static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1388
{
1389
struct rbd_req_coll *coll =
1390
kzalloc(sizeof(struct rbd_req_coll) +
1391
sizeof(struct rbd_req_status) * num_reqs,
1392
GFP_ATOMIC);
1393
1394
if (!coll)
1395
return NULL;
1396
coll->total = num_reqs;
1397
kref_init(&coll->kref);
1398
return coll;
1399
}
1400
1401
/*
1402
* block device queue callback
1403
*/
1404
static void rbd_rq_fn(struct request_queue *q)
1405
{
1406
struct rbd_device *rbd_dev = q->queuedata;
1407
struct request *rq;
1408
struct bio_pair *bp = NULL;
1409
1410
rq = blk_fetch_request(q);
1411
1412
while (1) {
1413
struct bio *bio;
1414
struct bio *rq_bio, *next_bio = NULL;
1415
bool do_write;
1416
int size, op_size = 0;
1417
u64 ofs;
1418
int num_segs, cur_seg = 0;
1419
struct rbd_req_coll *coll;
1420
1421
/* peek at request from block layer */
1422
if (!rq)
1423
break;
1424
1425
dout("fetched request\n");
1426
1427
/* filter out block requests we don't understand */
1428
if ((rq->cmd_type != REQ_TYPE_FS)) {
1429
__blk_end_request_all(rq, 0);
1430
goto next;
1431
}
1432
1433
/* deduce our operation (read, write) */
1434
do_write = (rq_data_dir(rq) == WRITE);
1435
1436
size = blk_rq_bytes(rq);
1437
ofs = blk_rq_pos(rq) * 512ULL;
1438
rq_bio = rq->bio;
1439
if (do_write && rbd_dev->read_only) {
1440
__blk_end_request_all(rq, -EROFS);
1441
goto next;
1442
}
1443
1444
spin_unlock_irq(q->queue_lock);
1445
1446
dout("%s 0x%x bytes at 0x%llx\n",
1447
do_write ? "write" : "read",
1448
size, blk_rq_pos(rq) * 512ULL);
1449
1450
num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1451
coll = rbd_alloc_coll(num_segs);
1452
if (!coll) {
1453
spin_lock_irq(q->queue_lock);
1454
__blk_end_request_all(rq, -ENOMEM);
1455
goto next;
1456
}
1457
1458
do {
1459
/* a bio clone to be passed down to OSD req */
1460
dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1461
op_size = rbd_get_segment(&rbd_dev->header,
1462
rbd_dev->header.block_name,
1463
ofs, size,
1464
NULL, NULL);
1465
kref_get(&coll->kref);
1466
bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1467
op_size, GFP_ATOMIC);
1468
if (!bio) {
1469
rbd_coll_end_req_index(rq, coll, cur_seg,
1470
-ENOMEM, op_size);
1471
goto next_seg;
1472
}
1473
1474
1475
/* init OSD command: write or read */
1476
if (do_write)
1477
rbd_req_write(rq, rbd_dev,
1478
rbd_dev->header.snapc,
1479
ofs,
1480
op_size, bio,
1481
coll, cur_seg);
1482
else
1483
rbd_req_read(rq, rbd_dev,
1484
cur_snap_id(rbd_dev),
1485
ofs,
1486
op_size, bio,
1487
coll, cur_seg);
1488
1489
next_seg:
1490
size -= op_size;
1491
ofs += op_size;
1492
1493
cur_seg++;
1494
rq_bio = next_bio;
1495
} while (size > 0);
1496
kref_put(&coll->kref, rbd_coll_release);
1497
1498
if (bp)
1499
bio_pair_release(bp);
1500
spin_lock_irq(q->queue_lock);
1501
next:
1502
rq = blk_fetch_request(q);
1503
}
1504
}
1505
1506
/*
1507
* a queue callback. Makes sure that we don't create a bio that spans across
1508
* multiple osd objects. One exception would be with a single page bios,
1509
* which we handle later at bio_chain_clone
1510
*/
1511
static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1512
struct bio_vec *bvec)
1513
{
1514
struct rbd_device *rbd_dev = q->queuedata;
1515
unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1516
sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1517
unsigned int bio_sectors = bmd->bi_size >> 9;
1518
int max;
1519
1520
max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1521
+ bio_sectors)) << 9;
1522
if (max < 0)
1523
max = 0; /* bio_add cannot handle a negative return */
1524
if (max <= bvec->bv_len && bio_sectors == 0)
1525
return bvec->bv_len;
1526
return max;
1527
}
1528
1529
static void rbd_free_disk(struct rbd_device *rbd_dev)
1530
{
1531
struct gendisk *disk = rbd_dev->disk;
1532
1533
if (!disk)
1534
return;
1535
1536
rbd_header_free(&rbd_dev->header);
1537
1538
if (disk->flags & GENHD_FL_UP)
1539
del_gendisk(disk);
1540
if (disk->queue)
1541
blk_cleanup_queue(disk->queue);
1542
put_disk(disk);
1543
}
1544
1545
/*
1546
* reload the ondisk the header
1547
*/
1548
static int rbd_read_header(struct rbd_device *rbd_dev,
1549
struct rbd_image_header *header)
1550
{
1551
ssize_t rc;
1552
struct rbd_image_header_ondisk *dh;
1553
int snap_count = 0;
1554
u64 snap_names_len = 0;
1555
u64 ver;
1556
1557
while (1) {
1558
int len = sizeof(*dh) +
1559
snap_count * sizeof(struct rbd_image_snap_ondisk) +
1560
snap_names_len;
1561
1562
rc = -ENOMEM;
1563
dh = kmalloc(len, GFP_KERNEL);
1564
if (!dh)
1565
return -ENOMEM;
1566
1567
rc = rbd_req_sync_read(rbd_dev,
1568
NULL, CEPH_NOSNAP,
1569
rbd_dev->obj_md_name,
1570
0, len,
1571
(char *)dh, &ver);
1572
if (rc < 0)
1573
goto out_dh;
1574
1575
rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1576
if (rc < 0)
1577
goto out_dh;
1578
1579
if (snap_count != header->total_snaps) {
1580
snap_count = header->total_snaps;
1581
snap_names_len = header->snap_names_len;
1582
rbd_header_free(header);
1583
kfree(dh);
1584
continue;
1585
}
1586
break;
1587
}
1588
header->obj_version = ver;
1589
1590
out_dh:
1591
kfree(dh);
1592
return rc;
1593
}
1594
1595
/*
1596
* create a snapshot
1597
*/
1598
static int rbd_header_add_snap(struct rbd_device *dev,
1599
const char *snap_name,
1600
gfp_t gfp_flags)
1601
{
1602
int name_len = strlen(snap_name);
1603
u64 new_snapid;
1604
int ret;
1605
void *data, *p, *e;
1606
u64 ver;
1607
1608
/* we should create a snapshot only if we're pointing at the head */
1609
if (dev->cur_snap)
1610
return -EINVAL;
1611
1612
ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1613
&new_snapid);
1614
dout("created snapid=%lld\n", new_snapid);
1615
if (ret < 0)
1616
return ret;
1617
1618
data = kmalloc(name_len + 16, gfp_flags);
1619
if (!data)
1620
return -ENOMEM;
1621
1622
p = data;
1623
e = data + name_len + 16;
1624
1625
ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1626
ceph_encode_64_safe(&p, e, new_snapid, bad);
1627
1628
ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1629
data, p - data, &ver);
1630
1631
kfree(data);
1632
1633
if (ret < 0)
1634
return ret;
1635
1636
dev->header.snapc->seq = new_snapid;
1637
1638
return 0;
1639
bad:
1640
return -ERANGE;
1641
}
1642
1643
static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1644
{
1645
struct rbd_snap *snap;
1646
1647
while (!list_empty(&rbd_dev->snaps)) {
1648
snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1649
__rbd_remove_snap_dev(rbd_dev, snap);
1650
}
1651
}
1652
1653
/*
1654
* only read the first part of the ondisk header, without the snaps info
1655
*/
1656
static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1657
{
1658
int ret;
1659
struct rbd_image_header h;
1660
u64 snap_seq;
1661
int follow_seq = 0;
1662
1663
ret = rbd_read_header(rbd_dev, &h);
1664
if (ret < 0)
1665
return ret;
1666
1667
/* resized? */
1668
set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1669
1670
down_write(&rbd_dev->header.snap_rwsem);
1671
1672
snap_seq = rbd_dev->header.snapc->seq;
1673
if (rbd_dev->header.total_snaps &&
1674
rbd_dev->header.snapc->snaps[0] == snap_seq)
1675
/* pointing at the head, will need to follow that
1676
if head moves */
1677
follow_seq = 1;
1678
1679
kfree(rbd_dev->header.snapc);
1680
kfree(rbd_dev->header.snap_names);
1681
kfree(rbd_dev->header.snap_sizes);
1682
1683
rbd_dev->header.total_snaps = h.total_snaps;
1684
rbd_dev->header.snapc = h.snapc;
1685
rbd_dev->header.snap_names = h.snap_names;
1686
rbd_dev->header.snap_names_len = h.snap_names_len;
1687
rbd_dev->header.snap_sizes = h.snap_sizes;
1688
if (follow_seq)
1689
rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1690
else
1691
rbd_dev->header.snapc->seq = snap_seq;
1692
1693
ret = __rbd_init_snaps_header(rbd_dev);
1694
1695
up_write(&rbd_dev->header.snap_rwsem);
1696
1697
return ret;
1698
}
1699
1700
static int rbd_init_disk(struct rbd_device *rbd_dev)
1701
{
1702
struct gendisk *disk;
1703
struct request_queue *q;
1704
int rc;
1705
u64 total_size = 0;
1706
1707
/* contact OSD, request size info about the object being mapped */
1708
rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1709
if (rc)
1710
return rc;
1711
1712
/* no need to lock here, as rbd_dev is not registered yet */
1713
rc = __rbd_init_snaps_header(rbd_dev);
1714
if (rc)
1715
return rc;
1716
1717
rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1718
if (rc)
1719
return rc;
1720
1721
/* create gendisk info */
1722
rc = -ENOMEM;
1723
disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1724
if (!disk)
1725
goto out;
1726
1727
snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1728
rbd_dev->id);
1729
disk->major = rbd_dev->major;
1730
disk->first_minor = 0;
1731
disk->fops = &rbd_bd_ops;
1732
disk->private_data = rbd_dev;
1733
1734
/* init rq */
1735
rc = -ENOMEM;
1736
q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1737
if (!q)
1738
goto out_disk;
1739
blk_queue_merge_bvec(q, rbd_merge_bvec);
1740
disk->queue = q;
1741
1742
q->queuedata = rbd_dev;
1743
1744
rbd_dev->disk = disk;
1745
rbd_dev->q = q;
1746
1747
/* finally, announce the disk to the world */
1748
set_capacity(disk, total_size / 512ULL);
1749
add_disk(disk);
1750
1751
pr_info("%s: added with size 0x%llx\n",
1752
disk->disk_name, (unsigned long long)total_size);
1753
return 0;
1754
1755
out_disk:
1756
put_disk(disk);
1757
out:
1758
return rc;
1759
}
1760
1761
/*
1762
sysfs
1763
*/
1764
1765
static ssize_t rbd_size_show(struct device *dev,
1766
struct device_attribute *attr, char *buf)
1767
{
1768
struct rbd_device *rbd_dev = dev_to_rbd(dev);
1769
1770
return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1771
}
1772
1773
static ssize_t rbd_major_show(struct device *dev,
1774
struct device_attribute *attr, char *buf)
1775
{
1776
struct rbd_device *rbd_dev = dev_to_rbd(dev);
1777
1778
return sprintf(buf, "%d\n", rbd_dev->major);
1779
}
1780
1781
static ssize_t rbd_client_id_show(struct device *dev,
1782
struct device_attribute *attr, char *buf)
1783
{
1784
struct rbd_device *rbd_dev = dev_to_rbd(dev);
1785
1786
return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1787
}
1788
1789
static ssize_t rbd_pool_show(struct device *dev,
1790
struct device_attribute *attr, char *buf)
1791
{
1792
struct rbd_device *rbd_dev = dev_to_rbd(dev);
1793
1794
return sprintf(buf, "%s\n", rbd_dev->pool_name);
1795
}
1796
1797
static ssize_t rbd_name_show(struct device *dev,
1798
struct device_attribute *attr, char *buf)
1799
{
1800
struct rbd_device *rbd_dev = dev_to_rbd(dev);
1801
1802
return sprintf(buf, "%s\n", rbd_dev->obj);
1803
}
1804
1805
static ssize_t rbd_snap_show(struct device *dev,
1806
struct device_attribute *attr,
1807
char *buf)
1808
{
1809
struct rbd_device *rbd_dev = dev_to_rbd(dev);
1810
1811
return sprintf(buf, "%s\n", rbd_dev->snap_name);
1812
}
1813
1814
static ssize_t rbd_image_refresh(struct device *dev,
1815
struct device_attribute *attr,
1816
const char *buf,
1817
size_t size)
1818
{
1819
struct rbd_device *rbd_dev = dev_to_rbd(dev);
1820
int rc;
1821
int ret = size;
1822
1823
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1824
1825
rc = __rbd_update_snaps(rbd_dev);
1826
if (rc < 0)
1827
ret = rc;
1828
1829
mutex_unlock(&ctl_mutex);
1830
return ret;
1831
}
1832
1833
static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1834
static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1835
static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1836
static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1837
static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1838
static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1839
static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1840
static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1841
static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1842
1843
static struct attribute *rbd_attrs[] = {
1844
&dev_attr_size.attr,
1845
&dev_attr_major.attr,
1846
&dev_attr_client_id.attr,
1847
&dev_attr_pool.attr,
1848
&dev_attr_name.attr,
1849
&dev_attr_current_snap.attr,
1850
&dev_attr_refresh.attr,
1851
&dev_attr_create_snap.attr,
1852
&dev_attr_rollback_snap.attr,
1853
NULL
1854
};
1855
1856
static struct attribute_group rbd_attr_group = {
1857
.attrs = rbd_attrs,
1858
};
1859
1860
static const struct attribute_group *rbd_attr_groups[] = {
1861
&rbd_attr_group,
1862
NULL
1863
};
1864
1865
static void rbd_sysfs_dev_release(struct device *dev)
1866
{
1867
}
1868
1869
static struct device_type rbd_device_type = {
1870
.name = "rbd",
1871
.groups = rbd_attr_groups,
1872
.release = rbd_sysfs_dev_release,
1873
};
1874
1875
1876
/*
1877
sysfs - snapshots
1878
*/
1879
1880
static ssize_t rbd_snap_size_show(struct device *dev,
1881
struct device_attribute *attr,
1882
char *buf)
1883
{
1884
struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1885
1886
return sprintf(buf, "%lld\n", (long long)snap->size);
1887
}
1888
1889
static ssize_t rbd_snap_id_show(struct device *dev,
1890
struct device_attribute *attr,
1891
char *buf)
1892
{
1893
struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1894
1895
return sprintf(buf, "%lld\n", (long long)snap->id);
1896
}
1897
1898
static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1899
static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1900
1901
static struct attribute *rbd_snap_attrs[] = {
1902
&dev_attr_snap_size.attr,
1903
&dev_attr_snap_id.attr,
1904
NULL,
1905
};
1906
1907
static struct attribute_group rbd_snap_attr_group = {
1908
.attrs = rbd_snap_attrs,
1909
};
1910
1911
static void rbd_snap_dev_release(struct device *dev)
1912
{
1913
struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1914
kfree(snap->name);
1915
kfree(snap);
1916
}
1917
1918
static const struct attribute_group *rbd_snap_attr_groups[] = {
1919
&rbd_snap_attr_group,
1920
NULL
1921
};
1922
1923
static struct device_type rbd_snap_device_type = {
1924
.groups = rbd_snap_attr_groups,
1925
.release = rbd_snap_dev_release,
1926
};
1927
1928
static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1929
struct rbd_snap *snap)
1930
{
1931
list_del(&snap->node);
1932
device_unregister(&snap->dev);
1933
}
1934
1935
static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1936
struct rbd_snap *snap,
1937
struct device *parent)
1938
{
1939
struct device *dev = &snap->dev;
1940
int ret;
1941
1942
dev->type = &rbd_snap_device_type;
1943
dev->parent = parent;
1944
dev->release = rbd_snap_dev_release;
1945
dev_set_name(dev, "snap_%s", snap->name);
1946
ret = device_register(dev);
1947
1948
return ret;
1949
}
1950
1951
static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1952
int i, const char *name,
1953
struct rbd_snap **snapp)
1954
{
1955
int ret;
1956
struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1957
if (!snap)
1958
return -ENOMEM;
1959
snap->name = kstrdup(name, GFP_KERNEL);
1960
snap->size = rbd_dev->header.snap_sizes[i];
1961
snap->id = rbd_dev->header.snapc->snaps[i];
1962
if (device_is_registered(&rbd_dev->dev)) {
1963
ret = rbd_register_snap_dev(rbd_dev, snap,
1964
&rbd_dev->dev);
1965
if (ret < 0)
1966
goto err;
1967
}
1968
*snapp = snap;
1969
return 0;
1970
err:
1971
kfree(snap->name);
1972
kfree(snap);
1973
return ret;
1974
}
1975
1976
/*
1977
* search for the previous snap in a null delimited string list
1978
*/
1979
const char *rbd_prev_snap_name(const char *name, const char *start)
1980
{
1981
if (name < start + 2)
1982
return NULL;
1983
1984
name -= 2;
1985
while (*name) {
1986
if (name == start)
1987
return start;
1988
name--;
1989
}
1990
return name + 1;
1991
}
1992
1993
/*
1994
* compare the old list of snapshots that we have to what's in the header
1995
* and update it accordingly. Note that the header holds the snapshots
1996
* in a reverse order (from newest to oldest) and we need to go from
1997
* older to new so that we don't get a duplicate snap name when
1998
* doing the process (e.g., removed snapshot and recreated a new
1999
* one with the same name.
2000
*/
2001
static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2002
{
2003
const char *name, *first_name;
2004
int i = rbd_dev->header.total_snaps;
2005
struct rbd_snap *snap, *old_snap = NULL;
2006
int ret;
2007
struct list_head *p, *n;
2008
2009
first_name = rbd_dev->header.snap_names;
2010
name = first_name + rbd_dev->header.snap_names_len;
2011
2012
list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2013
u64 cur_id;
2014
2015
old_snap = list_entry(p, struct rbd_snap, node);
2016
2017
if (i)
2018
cur_id = rbd_dev->header.snapc->snaps[i - 1];
2019
2020
if (!i || old_snap->id < cur_id) {
2021
/* old_snap->id was skipped, thus was removed */
2022
__rbd_remove_snap_dev(rbd_dev, old_snap);
2023
continue;
2024
}
2025
if (old_snap->id == cur_id) {
2026
/* we have this snapshot already */
2027
i--;
2028
name = rbd_prev_snap_name(name, first_name);
2029
continue;
2030
}
2031
for (; i > 0;
2032
i--, name = rbd_prev_snap_name(name, first_name)) {
2033
if (!name) {
2034
WARN_ON(1);
2035
return -EINVAL;
2036
}
2037
cur_id = rbd_dev->header.snapc->snaps[i];
2038
/* snapshot removal? handle it above */
2039
if (cur_id >= old_snap->id)
2040
break;
2041
/* a new snapshot */
2042
ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2043
if (ret < 0)
2044
return ret;
2045
2046
/* note that we add it backward so using n and not p */
2047
list_add(&snap->node, n);
2048
p = &snap->node;
2049
}
2050
}
2051
/* we're done going over the old snap list, just add what's left */
2052
for (; i > 0; i--) {
2053
name = rbd_prev_snap_name(name, first_name);
2054
if (!name) {
2055
WARN_ON(1);
2056
return -EINVAL;
2057
}
2058
ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2059
if (ret < 0)
2060
return ret;
2061
list_add(&snap->node, &rbd_dev->snaps);
2062
}
2063
2064
return 0;
2065
}
2066
2067
2068
static void rbd_root_dev_release(struct device *dev)
2069
{
2070
}
2071
2072
static struct device rbd_root_dev = {
2073
.init_name = "rbd",
2074
.release = rbd_root_dev_release,
2075
};
2076
2077
static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2078
{
2079
int ret = -ENOMEM;
2080
struct device *dev;
2081
struct rbd_snap *snap;
2082
2083
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2084
dev = &rbd_dev->dev;
2085
2086
dev->bus = &rbd_bus_type;
2087
dev->type = &rbd_device_type;
2088
dev->parent = &rbd_root_dev;
2089
dev->release = rbd_dev_release;
2090
dev_set_name(dev, "%d", rbd_dev->id);
2091
ret = device_register(dev);
2092
if (ret < 0)
2093
goto done_free;
2094
2095
list_for_each_entry(snap, &rbd_dev->snaps, node) {
2096
ret = rbd_register_snap_dev(rbd_dev, snap,
2097
&rbd_dev->dev);
2098
if (ret < 0)
2099
break;
2100
}
2101
2102
mutex_unlock(&ctl_mutex);
2103
return 0;
2104
done_free:
2105
mutex_unlock(&ctl_mutex);
2106
return ret;
2107
}
2108
2109
static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2110
{
2111
device_unregister(&rbd_dev->dev);
2112
}
2113
2114
static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2115
{
2116
int ret, rc;
2117
2118
do {
2119
ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2120
rbd_dev->header.obj_version);
2121
if (ret == -ERANGE) {
2122
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2123
rc = __rbd_update_snaps(rbd_dev);
2124
mutex_unlock(&ctl_mutex);
2125
if (rc < 0)
2126
return rc;
2127
}
2128
} while (ret == -ERANGE);
2129
2130
return ret;
2131
}
2132
2133
static ssize_t rbd_add(struct bus_type *bus,
2134
const char *buf,
2135
size_t count)
2136
{
2137
struct ceph_osd_client *osdc;
2138
struct rbd_device *rbd_dev;
2139
ssize_t rc = -ENOMEM;
2140
int irc, new_id = 0;
2141
struct list_head *tmp;
2142
char *mon_dev_name;
2143
char *options;
2144
2145
if (!try_module_get(THIS_MODULE))
2146
return -ENODEV;
2147
2148
mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2149
if (!mon_dev_name)
2150
goto err_out_mod;
2151
2152
options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2153
if (!options)
2154
goto err_mon_dev;
2155
2156
/* new rbd_device object */
2157
rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2158
if (!rbd_dev)
2159
goto err_out_opt;
2160
2161
/* static rbd_device initialization */
2162
spin_lock_init(&rbd_dev->lock);
2163
INIT_LIST_HEAD(&rbd_dev->node);
2164
INIT_LIST_HEAD(&rbd_dev->snaps);
2165
2166
/* generate unique id: find highest unique id, add one */
2167
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2168
2169
list_for_each(tmp, &rbd_dev_list) {
2170
struct rbd_device *rbd_dev;
2171
2172
rbd_dev = list_entry(tmp, struct rbd_device, node);
2173
if (rbd_dev->id >= new_id)
2174
new_id = rbd_dev->id + 1;
2175
}
2176
2177
rbd_dev->id = new_id;
2178
2179
/* add to global list */
2180
list_add_tail(&rbd_dev->node, &rbd_dev_list);
2181
2182
/* parse add command */
2183
if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2184
"%" __stringify(RBD_MAX_OPT_LEN) "s "
2185
"%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2186
"%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2187
"%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2188
mon_dev_name, options, rbd_dev->pool_name,
2189
rbd_dev->obj, rbd_dev->snap_name) < 4) {
2190
rc = -EINVAL;
2191
goto err_out_slot;
2192
}
2193
2194
if (rbd_dev->snap_name[0] == 0)
2195
rbd_dev->snap_name[0] = '-';
2196
2197
rbd_dev->obj_len = strlen(rbd_dev->obj);
2198
snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2199
rbd_dev->obj, RBD_SUFFIX);
2200
2201
/* initialize rest of new object */
2202
snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2203
rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2204
if (rc < 0)
2205
goto err_out_slot;
2206
2207
mutex_unlock(&ctl_mutex);
2208
2209
/* pick the pool */
2210
osdc = &rbd_dev->client->osdc;
2211
rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2212
if (rc < 0)
2213
goto err_out_client;
2214
rbd_dev->poolid = rc;
2215
2216
/* register our block device */
2217
irc = register_blkdev(0, rbd_dev->name);
2218
if (irc < 0) {
2219
rc = irc;
2220
goto err_out_client;
2221
}
2222
rbd_dev->major = irc;
2223
2224
rc = rbd_bus_add_dev(rbd_dev);
2225
if (rc)
2226
goto err_out_blkdev;
2227
2228
/* set up and announce blkdev mapping */
2229
rc = rbd_init_disk(rbd_dev);
2230
if (rc)
2231
goto err_out_bus;
2232
2233
rc = rbd_init_watch_dev(rbd_dev);
2234
if (rc)
2235
goto err_out_bus;
2236
2237
return count;
2238
2239
err_out_bus:
2240
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2241
list_del_init(&rbd_dev->node);
2242
mutex_unlock(&ctl_mutex);
2243
2244
/* this will also clean up rest of rbd_dev stuff */
2245
2246
rbd_bus_del_dev(rbd_dev);
2247
kfree(options);
2248
kfree(mon_dev_name);
2249
return rc;
2250
2251
err_out_blkdev:
2252
unregister_blkdev(rbd_dev->major, rbd_dev->name);
2253
err_out_client:
2254
rbd_put_client(rbd_dev);
2255
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2256
err_out_slot:
2257
list_del_init(&rbd_dev->node);
2258
mutex_unlock(&ctl_mutex);
2259
2260
kfree(rbd_dev);
2261
err_out_opt:
2262
kfree(options);
2263
err_mon_dev:
2264
kfree(mon_dev_name);
2265
err_out_mod:
2266
dout("Error adding device %s\n", buf);
2267
module_put(THIS_MODULE);
2268
return rc;
2269
}
2270
2271
static struct rbd_device *__rbd_get_dev(unsigned long id)
2272
{
2273
struct list_head *tmp;
2274
struct rbd_device *rbd_dev;
2275
2276
list_for_each(tmp, &rbd_dev_list) {
2277
rbd_dev = list_entry(tmp, struct rbd_device, node);
2278
if (rbd_dev->id == id)
2279
return rbd_dev;
2280
}
2281
return NULL;
2282
}
2283
2284
static void rbd_dev_release(struct device *dev)
2285
{
2286
struct rbd_device *rbd_dev =
2287
container_of(dev, struct rbd_device, dev);
2288
2289
if (rbd_dev->watch_request)
2290
ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2291
rbd_dev->watch_request);
2292
if (rbd_dev->watch_event)
2293
ceph_osdc_cancel_event(rbd_dev->watch_event);
2294
2295
rbd_put_client(rbd_dev);
2296
2297
/* clean up and free blkdev */
2298
rbd_free_disk(rbd_dev);
2299
unregister_blkdev(rbd_dev->major, rbd_dev->name);
2300
kfree(rbd_dev);
2301
2302
/* release module ref */
2303
module_put(THIS_MODULE);
2304
}
2305
2306
static ssize_t rbd_remove(struct bus_type *bus,
2307
const char *buf,
2308
size_t count)
2309
{
2310
struct rbd_device *rbd_dev = NULL;
2311
int target_id, rc;
2312
unsigned long ul;
2313
int ret = count;
2314
2315
rc = strict_strtoul(buf, 10, &ul);
2316
if (rc)
2317
return rc;
2318
2319
/* convert to int; abort if we lost anything in the conversion */
2320
target_id = (int) ul;
2321
if (target_id != ul)
2322
return -EINVAL;
2323
2324
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2325
2326
rbd_dev = __rbd_get_dev(target_id);
2327
if (!rbd_dev) {
2328
ret = -ENOENT;
2329
goto done;
2330
}
2331
2332
list_del_init(&rbd_dev->node);
2333
2334
__rbd_remove_all_snaps(rbd_dev);
2335
rbd_bus_del_dev(rbd_dev);
2336
2337
done:
2338
mutex_unlock(&ctl_mutex);
2339
return ret;
2340
}
2341
2342
static ssize_t rbd_snap_add(struct device *dev,
2343
struct device_attribute *attr,
2344
const char *buf,
2345
size_t count)
2346
{
2347
struct rbd_device *rbd_dev = dev_to_rbd(dev);
2348
int ret;
2349
char *name = kmalloc(count + 1, GFP_KERNEL);
2350
if (!name)
2351
return -ENOMEM;
2352
2353
snprintf(name, count, "%s", buf);
2354
2355
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2356
2357
ret = rbd_header_add_snap(rbd_dev,
2358
name, GFP_KERNEL);
2359
if (ret < 0)
2360
goto err_unlock;
2361
2362
ret = __rbd_update_snaps(rbd_dev);
2363
if (ret < 0)
2364
goto err_unlock;
2365
2366
/* shouldn't hold ctl_mutex when notifying.. notify might
2367
trigger a watch callback that would need to get that mutex */
2368
mutex_unlock(&ctl_mutex);
2369
2370
/* make a best effort, don't error if failed */
2371
rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2372
2373
ret = count;
2374
kfree(name);
2375
return ret;
2376
2377
err_unlock:
2378
mutex_unlock(&ctl_mutex);
2379
kfree(name);
2380
return ret;
2381
}
2382
2383
static ssize_t rbd_snap_rollback(struct device *dev,
2384
struct device_attribute *attr,
2385
const char *buf,
2386
size_t count)
2387
{
2388
struct rbd_device *rbd_dev = dev_to_rbd(dev);
2389
int ret;
2390
u64 snapid;
2391
u64 cur_ofs;
2392
char *seg_name = NULL;
2393
char *snap_name = kmalloc(count + 1, GFP_KERNEL);
2394
ret = -ENOMEM;
2395
if (!snap_name)
2396
return ret;
2397
2398
/* parse snaps add command */
2399
snprintf(snap_name, count, "%s", buf);
2400
seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
2401
if (!seg_name)
2402
goto done;
2403
2404
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2405
2406
ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
2407
if (ret < 0)
2408
goto done_unlock;
2409
2410
dout("snapid=%lld\n", snapid);
2411
2412
cur_ofs = 0;
2413
while (cur_ofs < rbd_dev->header.image_size) {
2414
cur_ofs += rbd_get_segment(&rbd_dev->header,
2415
rbd_dev->obj,
2416
cur_ofs, (u64)-1,
2417
seg_name, NULL);
2418
dout("seg_name=%s\n", seg_name);
2419
2420
ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
2421
if (ret < 0)
2422
pr_warning("could not roll back obj %s err=%d\n",
2423
seg_name, ret);
2424
}
2425
2426
ret = __rbd_update_snaps(rbd_dev);
2427
if (ret < 0)
2428
goto done_unlock;
2429
2430
ret = count;
2431
2432
done_unlock:
2433
mutex_unlock(&ctl_mutex);
2434
done:
2435
kfree(seg_name);
2436
kfree(snap_name);
2437
2438
return ret;
2439
}
2440
2441
static struct bus_attribute rbd_bus_attrs[] = {
2442
__ATTR(add, S_IWUSR, NULL, rbd_add),
2443
__ATTR(remove, S_IWUSR, NULL, rbd_remove),
2444
__ATTR_NULL
2445
};
2446
2447
/*
2448
* create control files in sysfs
2449
* /sys/bus/rbd/...
2450
*/
2451
static int rbd_sysfs_init(void)
2452
{
2453
int ret;
2454
2455
rbd_bus_type.bus_attrs = rbd_bus_attrs;
2456
2457
ret = bus_register(&rbd_bus_type);
2458
if (ret < 0)
2459
return ret;
2460
2461
ret = device_register(&rbd_root_dev);
2462
2463
return ret;
2464
}
2465
2466
static void rbd_sysfs_cleanup(void)
2467
{
2468
device_unregister(&rbd_root_dev);
2469
bus_unregister(&rbd_bus_type);
2470
}
2471
2472
int __init rbd_init(void)
2473
{
2474
int rc;
2475
2476
rc = rbd_sysfs_init();
2477
if (rc)
2478
return rc;
2479
spin_lock_init(&node_lock);
2480
pr_info("loaded " DRV_NAME_LONG "\n");
2481
return 0;
2482
}
2483
2484
void __exit rbd_exit(void)
2485
{
2486
rbd_sysfs_cleanup();
2487
}
2488
2489
module_init(rbd_init);
2490
module_exit(rbd_exit);
2491
2492
MODULE_AUTHOR("Sage Weil <[email protected]>");
2493
MODULE_AUTHOR("Yehuda Sadeh <[email protected]>");
2494
MODULE_DESCRIPTION("rados block device");
2495
2496
/* following authorship retained from original osdblk.c */
2497
MODULE_AUTHOR("Jeff Garzik <[email protected]>");
2498
2499
MODULE_LICENSE("GPL");
2500
2501