Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
awilliam
GitHub Repository: awilliam/linux-vfio
Path: blob/master/net/ceph/osd_client.c
15109 views
1
#include <linux/ceph/ceph_debug.h>
2
3
#include <linux/module.h>
4
#include <linux/err.h>
5
#include <linux/highmem.h>
6
#include <linux/mm.h>
7
#include <linux/pagemap.h>
8
#include <linux/slab.h>
9
#include <linux/uaccess.h>
10
#ifdef CONFIG_BLOCK
11
#include <linux/bio.h>
12
#endif
13
14
#include <linux/ceph/libceph.h>
15
#include <linux/ceph/osd_client.h>
16
#include <linux/ceph/messenger.h>
17
#include <linux/ceph/decode.h>
18
#include <linux/ceph/auth.h>
19
#include <linux/ceph/pagelist.h>
20
21
#define OSD_OP_FRONT_LEN 4096
22
#define OSD_OPREPLY_FRONT_LEN 512
23
24
static const struct ceph_connection_operations osd_con_ops;
25
26
static void send_queued(struct ceph_osd_client *osdc);
27
static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28
static void __register_request(struct ceph_osd_client *osdc,
29
struct ceph_osd_request *req);
30
static void __unregister_linger_request(struct ceph_osd_client *osdc,
31
struct ceph_osd_request *req);
32
static int __send_request(struct ceph_osd_client *osdc,
33
struct ceph_osd_request *req);
34
35
static int op_needs_trail(int op)
36
{
37
switch (op) {
38
case CEPH_OSD_OP_GETXATTR:
39
case CEPH_OSD_OP_SETXATTR:
40
case CEPH_OSD_OP_CMPXATTR:
41
case CEPH_OSD_OP_CALL:
42
case CEPH_OSD_OP_NOTIFY:
43
return 1;
44
default:
45
return 0;
46
}
47
}
48
49
static int op_has_extent(int op)
50
{
51
return (op == CEPH_OSD_OP_READ ||
52
op == CEPH_OSD_OP_WRITE);
53
}
54
55
void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
56
struct ceph_file_layout *layout,
57
u64 snapid,
58
u64 off, u64 *plen, u64 *bno,
59
struct ceph_osd_request *req,
60
struct ceph_osd_req_op *op)
61
{
62
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
63
u64 orig_len = *plen;
64
u64 objoff, objlen; /* extent in object */
65
66
reqhead->snapid = cpu_to_le64(snapid);
67
68
/* object extent? */
69
ceph_calc_file_object_mapping(layout, off, plen, bno,
70
&objoff, &objlen);
71
if (*plen < orig_len)
72
dout(" skipping last %llu, final file extent %llu~%llu\n",
73
orig_len - *plen, off, *plen);
74
75
if (op_has_extent(op->op)) {
76
op->extent.offset = objoff;
77
op->extent.length = objlen;
78
}
79
req->r_num_pages = calc_pages_for(off, *plen);
80
req->r_page_alignment = off & ~PAGE_MASK;
81
if (op->op == CEPH_OSD_OP_WRITE)
82
op->payload_len = *plen;
83
84
dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
85
*bno, objoff, objlen, req->r_num_pages);
86
87
}
88
EXPORT_SYMBOL(ceph_calc_raw_layout);
89
90
/*
91
* Implement client access to distributed object storage cluster.
92
*
93
* All data objects are stored within a cluster/cloud of OSDs, or
94
* "object storage devices." (Note that Ceph OSDs have _nothing_ to
95
* do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
96
* remote daemons serving up and coordinating consistent and safe
97
* access to storage.
98
*
99
* Cluster membership and the mapping of data objects onto storage devices
100
* are described by the osd map.
101
*
102
* We keep track of pending OSD requests (read, write), resubmit
103
* requests to different OSDs when the cluster topology/data layout
104
* change, or retry the affected requests when the communications
105
* channel with an OSD is reset.
106
*/
107
108
/*
109
* calculate the mapping of a file extent onto an object, and fill out the
110
* request accordingly. shorten extent as necessary if it crosses an
111
* object boundary.
112
*
113
* fill osd op in request message.
114
*/
115
static void calc_layout(struct ceph_osd_client *osdc,
116
struct ceph_vino vino,
117
struct ceph_file_layout *layout,
118
u64 off, u64 *plen,
119
struct ceph_osd_request *req,
120
struct ceph_osd_req_op *op)
121
{
122
u64 bno;
123
124
ceph_calc_raw_layout(osdc, layout, vino.snap, off,
125
plen, &bno, req, op);
126
127
snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
128
req->r_oid_len = strlen(req->r_oid);
129
}
130
131
/*
132
* requests
133
*/
134
void ceph_osdc_release_request(struct kref *kref)
135
{
136
struct ceph_osd_request *req = container_of(kref,
137
struct ceph_osd_request,
138
r_kref);
139
140
if (req->r_request)
141
ceph_msg_put(req->r_request);
142
if (req->r_reply)
143
ceph_msg_put(req->r_reply);
144
if (req->r_con_filling_msg) {
145
dout("release_request revoking pages %p from con %p\n",
146
req->r_pages, req->r_con_filling_msg);
147
ceph_con_revoke_message(req->r_con_filling_msg,
148
req->r_reply);
149
ceph_con_put(req->r_con_filling_msg);
150
}
151
if (req->r_own_pages)
152
ceph_release_page_vector(req->r_pages,
153
req->r_num_pages);
154
#ifdef CONFIG_BLOCK
155
if (req->r_bio)
156
bio_put(req->r_bio);
157
#endif
158
ceph_put_snap_context(req->r_snapc);
159
if (req->r_trail) {
160
ceph_pagelist_release(req->r_trail);
161
kfree(req->r_trail);
162
}
163
if (req->r_mempool)
164
mempool_free(req, req->r_osdc->req_mempool);
165
else
166
kfree(req);
167
}
168
EXPORT_SYMBOL(ceph_osdc_release_request);
169
170
static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
171
{
172
int i = 0;
173
174
if (needs_trail)
175
*needs_trail = 0;
176
while (ops[i].op) {
177
if (needs_trail && op_needs_trail(ops[i].op))
178
*needs_trail = 1;
179
i++;
180
}
181
182
return i;
183
}
184
185
struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
186
int flags,
187
struct ceph_snap_context *snapc,
188
struct ceph_osd_req_op *ops,
189
bool use_mempool,
190
gfp_t gfp_flags,
191
struct page **pages,
192
struct bio *bio)
193
{
194
struct ceph_osd_request *req;
195
struct ceph_msg *msg;
196
int needs_trail;
197
int num_op = get_num_ops(ops, &needs_trail);
198
size_t msg_size = sizeof(struct ceph_osd_request_head);
199
200
msg_size += num_op*sizeof(struct ceph_osd_op);
201
202
if (use_mempool) {
203
req = mempool_alloc(osdc->req_mempool, gfp_flags);
204
memset(req, 0, sizeof(*req));
205
} else {
206
req = kzalloc(sizeof(*req), gfp_flags);
207
}
208
if (req == NULL)
209
return NULL;
210
211
req->r_osdc = osdc;
212
req->r_mempool = use_mempool;
213
214
kref_init(&req->r_kref);
215
init_completion(&req->r_completion);
216
init_completion(&req->r_safe_completion);
217
INIT_LIST_HEAD(&req->r_unsafe_item);
218
INIT_LIST_HEAD(&req->r_linger_item);
219
INIT_LIST_HEAD(&req->r_linger_osd);
220
req->r_flags = flags;
221
222
WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
223
224
/* create reply message */
225
if (use_mempool)
226
msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
227
else
228
msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
229
OSD_OPREPLY_FRONT_LEN, gfp_flags);
230
if (!msg) {
231
ceph_osdc_put_request(req);
232
return NULL;
233
}
234
req->r_reply = msg;
235
236
/* allocate space for the trailing data */
237
if (needs_trail) {
238
req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
239
if (!req->r_trail) {
240
ceph_osdc_put_request(req);
241
return NULL;
242
}
243
ceph_pagelist_init(req->r_trail);
244
}
245
/* create request message; allow space for oid */
246
msg_size += 40;
247
if (snapc)
248
msg_size += sizeof(u64) * snapc->num_snaps;
249
if (use_mempool)
250
msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
251
else
252
msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
253
if (!msg) {
254
ceph_osdc_put_request(req);
255
return NULL;
256
}
257
258
msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
259
memset(msg->front.iov_base, 0, msg->front.iov_len);
260
261
req->r_request = msg;
262
req->r_pages = pages;
263
#ifdef CONFIG_BLOCK
264
if (bio) {
265
req->r_bio = bio;
266
bio_get(req->r_bio);
267
}
268
#endif
269
270
return req;
271
}
272
EXPORT_SYMBOL(ceph_osdc_alloc_request);
273
274
static void osd_req_encode_op(struct ceph_osd_request *req,
275
struct ceph_osd_op *dst,
276
struct ceph_osd_req_op *src)
277
{
278
dst->op = cpu_to_le16(src->op);
279
280
switch (dst->op) {
281
case CEPH_OSD_OP_READ:
282
case CEPH_OSD_OP_WRITE:
283
dst->extent.offset =
284
cpu_to_le64(src->extent.offset);
285
dst->extent.length =
286
cpu_to_le64(src->extent.length);
287
dst->extent.truncate_size =
288
cpu_to_le64(src->extent.truncate_size);
289
dst->extent.truncate_seq =
290
cpu_to_le32(src->extent.truncate_seq);
291
break;
292
293
case CEPH_OSD_OP_GETXATTR:
294
case CEPH_OSD_OP_SETXATTR:
295
case CEPH_OSD_OP_CMPXATTR:
296
BUG_ON(!req->r_trail);
297
298
dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
299
dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
300
dst->xattr.cmp_op = src->xattr.cmp_op;
301
dst->xattr.cmp_mode = src->xattr.cmp_mode;
302
ceph_pagelist_append(req->r_trail, src->xattr.name,
303
src->xattr.name_len);
304
ceph_pagelist_append(req->r_trail, src->xattr.val,
305
src->xattr.value_len);
306
break;
307
case CEPH_OSD_OP_CALL:
308
BUG_ON(!req->r_trail);
309
310
dst->cls.class_len = src->cls.class_len;
311
dst->cls.method_len = src->cls.method_len;
312
dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
313
314
ceph_pagelist_append(req->r_trail, src->cls.class_name,
315
src->cls.class_len);
316
ceph_pagelist_append(req->r_trail, src->cls.method_name,
317
src->cls.method_len);
318
ceph_pagelist_append(req->r_trail, src->cls.indata,
319
src->cls.indata_len);
320
break;
321
case CEPH_OSD_OP_ROLLBACK:
322
dst->snap.snapid = cpu_to_le64(src->snap.snapid);
323
break;
324
case CEPH_OSD_OP_STARTSYNC:
325
break;
326
case CEPH_OSD_OP_NOTIFY:
327
{
328
__le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
329
__le32 timeout = cpu_to_le32(src->watch.timeout);
330
331
BUG_ON(!req->r_trail);
332
333
ceph_pagelist_append(req->r_trail,
334
&prot_ver, sizeof(prot_ver));
335
ceph_pagelist_append(req->r_trail,
336
&timeout, sizeof(timeout));
337
}
338
case CEPH_OSD_OP_NOTIFY_ACK:
339
case CEPH_OSD_OP_WATCH:
340
dst->watch.cookie = cpu_to_le64(src->watch.cookie);
341
dst->watch.ver = cpu_to_le64(src->watch.ver);
342
dst->watch.flag = src->watch.flag;
343
break;
344
default:
345
pr_err("unrecognized osd opcode %d\n", dst->op);
346
WARN_ON(1);
347
break;
348
}
349
dst->payload_len = cpu_to_le32(src->payload_len);
350
}
351
352
/*
353
* build new request AND message
354
*
355
*/
356
void ceph_osdc_build_request(struct ceph_osd_request *req,
357
u64 off, u64 *plen,
358
struct ceph_osd_req_op *src_ops,
359
struct ceph_snap_context *snapc,
360
struct timespec *mtime,
361
const char *oid,
362
int oid_len)
363
{
364
struct ceph_msg *msg = req->r_request;
365
struct ceph_osd_request_head *head;
366
struct ceph_osd_req_op *src_op;
367
struct ceph_osd_op *op;
368
void *p;
369
int num_op = get_num_ops(src_ops, NULL);
370
size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
371
int flags = req->r_flags;
372
u64 data_len = 0;
373
int i;
374
375
head = msg->front.iov_base;
376
op = (void *)(head + 1);
377
p = (void *)(op + num_op);
378
379
req->r_snapc = ceph_get_snap_context(snapc);
380
381
head->client_inc = cpu_to_le32(1); /* always, for now. */
382
head->flags = cpu_to_le32(flags);
383
if (flags & CEPH_OSD_FLAG_WRITE)
384
ceph_encode_timespec(&head->mtime, mtime);
385
head->num_ops = cpu_to_le16(num_op);
386
387
388
/* fill in oid */
389
head->object_len = cpu_to_le32(oid_len);
390
memcpy(p, oid, oid_len);
391
p += oid_len;
392
393
src_op = src_ops;
394
while (src_op->op) {
395
osd_req_encode_op(req, op, src_op);
396
src_op++;
397
op++;
398
}
399
400
if (req->r_trail)
401
data_len += req->r_trail->length;
402
403
if (snapc) {
404
head->snap_seq = cpu_to_le64(snapc->seq);
405
head->num_snaps = cpu_to_le32(snapc->num_snaps);
406
for (i = 0; i < snapc->num_snaps; i++) {
407
put_unaligned_le64(snapc->snaps[i], p);
408
p += sizeof(u64);
409
}
410
}
411
412
if (flags & CEPH_OSD_FLAG_WRITE) {
413
req->r_request->hdr.data_off = cpu_to_le16(off);
414
req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
415
} else if (data_len) {
416
req->r_request->hdr.data_off = 0;
417
req->r_request->hdr.data_len = cpu_to_le32(data_len);
418
}
419
420
req->r_request->page_alignment = req->r_page_alignment;
421
422
BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
423
msg_size = p - msg->front.iov_base;
424
msg->front.iov_len = msg_size;
425
msg->hdr.front_len = cpu_to_le32(msg_size);
426
return;
427
}
428
EXPORT_SYMBOL(ceph_osdc_build_request);
429
430
/*
431
* build new request AND message, calculate layout, and adjust file
432
* extent as needed.
433
*
434
* if the file was recently truncated, we include information about its
435
* old and new size so that the object can be updated appropriately. (we
436
* avoid synchronously deleting truncated objects because it's slow.)
437
*
438
* if @do_sync, include a 'startsync' command so that the osd will flush
439
* data quickly.
440
*/
441
struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
442
struct ceph_file_layout *layout,
443
struct ceph_vino vino,
444
u64 off, u64 *plen,
445
int opcode, int flags,
446
struct ceph_snap_context *snapc,
447
int do_sync,
448
u32 truncate_seq,
449
u64 truncate_size,
450
struct timespec *mtime,
451
bool use_mempool, int num_reply,
452
int page_align)
453
{
454
struct ceph_osd_req_op ops[3];
455
struct ceph_osd_request *req;
456
457
ops[0].op = opcode;
458
ops[0].extent.truncate_seq = truncate_seq;
459
ops[0].extent.truncate_size = truncate_size;
460
ops[0].payload_len = 0;
461
462
if (do_sync) {
463
ops[1].op = CEPH_OSD_OP_STARTSYNC;
464
ops[1].payload_len = 0;
465
ops[2].op = 0;
466
} else
467
ops[1].op = 0;
468
469
req = ceph_osdc_alloc_request(osdc, flags,
470
snapc, ops,
471
use_mempool,
472
GFP_NOFS, NULL, NULL);
473
if (!req)
474
return NULL;
475
476
/* calculate max write size */
477
calc_layout(osdc, vino, layout, off, plen, req, ops);
478
req->r_file_layout = *layout; /* keep a copy */
479
480
/* in case it differs from natural (file) alignment that
481
calc_layout filled in for us */
482
req->r_num_pages = calc_pages_for(page_align, *plen);
483
req->r_page_alignment = page_align;
484
485
ceph_osdc_build_request(req, off, plen, ops,
486
snapc,
487
mtime,
488
req->r_oid, req->r_oid_len);
489
490
return req;
491
}
492
EXPORT_SYMBOL(ceph_osdc_new_request);
493
494
/*
495
* We keep osd requests in an rbtree, sorted by ->r_tid.
496
*/
497
static void __insert_request(struct ceph_osd_client *osdc,
498
struct ceph_osd_request *new)
499
{
500
struct rb_node **p = &osdc->requests.rb_node;
501
struct rb_node *parent = NULL;
502
struct ceph_osd_request *req = NULL;
503
504
while (*p) {
505
parent = *p;
506
req = rb_entry(parent, struct ceph_osd_request, r_node);
507
if (new->r_tid < req->r_tid)
508
p = &(*p)->rb_left;
509
else if (new->r_tid > req->r_tid)
510
p = &(*p)->rb_right;
511
else
512
BUG();
513
}
514
515
rb_link_node(&new->r_node, parent, p);
516
rb_insert_color(&new->r_node, &osdc->requests);
517
}
518
519
static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
520
u64 tid)
521
{
522
struct ceph_osd_request *req;
523
struct rb_node *n = osdc->requests.rb_node;
524
525
while (n) {
526
req = rb_entry(n, struct ceph_osd_request, r_node);
527
if (tid < req->r_tid)
528
n = n->rb_left;
529
else if (tid > req->r_tid)
530
n = n->rb_right;
531
else
532
return req;
533
}
534
return NULL;
535
}
536
537
static struct ceph_osd_request *
538
__lookup_request_ge(struct ceph_osd_client *osdc,
539
u64 tid)
540
{
541
struct ceph_osd_request *req;
542
struct rb_node *n = osdc->requests.rb_node;
543
544
while (n) {
545
req = rb_entry(n, struct ceph_osd_request, r_node);
546
if (tid < req->r_tid) {
547
if (!n->rb_left)
548
return req;
549
n = n->rb_left;
550
} else if (tid > req->r_tid) {
551
n = n->rb_right;
552
} else {
553
return req;
554
}
555
}
556
return NULL;
557
}
558
559
/*
560
* Resubmit requests pending on the given osd.
561
*/
562
static void __kick_osd_requests(struct ceph_osd_client *osdc,
563
struct ceph_osd *osd)
564
{
565
struct ceph_osd_request *req, *nreq;
566
int err;
567
568
dout("__kick_osd_requests osd%d\n", osd->o_osd);
569
err = __reset_osd(osdc, osd);
570
if (err == -EAGAIN)
571
return;
572
573
list_for_each_entry(req, &osd->o_requests, r_osd_item) {
574
list_move(&req->r_req_lru_item, &osdc->req_unsent);
575
dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
576
osd->o_osd);
577
if (!req->r_linger)
578
req->r_flags |= CEPH_OSD_FLAG_RETRY;
579
}
580
581
list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
582
r_linger_osd) {
583
/*
584
* reregister request prior to unregistering linger so
585
* that r_osd is preserved.
586
*/
587
BUG_ON(!list_empty(&req->r_req_lru_item));
588
__register_request(osdc, req);
589
list_add(&req->r_req_lru_item, &osdc->req_unsent);
590
list_add(&req->r_osd_item, &req->r_osd->o_requests);
591
__unregister_linger_request(osdc, req);
592
dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
593
osd->o_osd);
594
}
595
}
596
597
static void kick_osd_requests(struct ceph_osd_client *osdc,
598
struct ceph_osd *kickosd)
599
{
600
mutex_lock(&osdc->request_mutex);
601
__kick_osd_requests(osdc, kickosd);
602
mutex_unlock(&osdc->request_mutex);
603
}
604
605
/*
606
* If the osd connection drops, we need to resubmit all requests.
607
*/
608
static void osd_reset(struct ceph_connection *con)
609
{
610
struct ceph_osd *osd = con->private;
611
struct ceph_osd_client *osdc;
612
613
if (!osd)
614
return;
615
dout("osd_reset osd%d\n", osd->o_osd);
616
osdc = osd->o_osdc;
617
down_read(&osdc->map_sem);
618
kick_osd_requests(osdc, osd);
619
send_queued(osdc);
620
up_read(&osdc->map_sem);
621
}
622
623
/*
624
* Track open sessions with osds.
625
*/
626
static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
627
{
628
struct ceph_osd *osd;
629
630
osd = kzalloc(sizeof(*osd), GFP_NOFS);
631
if (!osd)
632
return NULL;
633
634
atomic_set(&osd->o_ref, 1);
635
osd->o_osdc = osdc;
636
INIT_LIST_HEAD(&osd->o_requests);
637
INIT_LIST_HEAD(&osd->o_linger_requests);
638
INIT_LIST_HEAD(&osd->o_osd_lru);
639
osd->o_incarnation = 1;
640
641
ceph_con_init(osdc->client->msgr, &osd->o_con);
642
osd->o_con.private = osd;
643
osd->o_con.ops = &osd_con_ops;
644
osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
645
646
INIT_LIST_HEAD(&osd->o_keepalive_item);
647
return osd;
648
}
649
650
static struct ceph_osd *get_osd(struct ceph_osd *osd)
651
{
652
if (atomic_inc_not_zero(&osd->o_ref)) {
653
dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
654
atomic_read(&osd->o_ref));
655
return osd;
656
} else {
657
dout("get_osd %p FAIL\n", osd);
658
return NULL;
659
}
660
}
661
662
static void put_osd(struct ceph_osd *osd)
663
{
664
dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
665
atomic_read(&osd->o_ref) - 1);
666
if (atomic_dec_and_test(&osd->o_ref)) {
667
struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
668
669
if (osd->o_authorizer)
670
ac->ops->destroy_authorizer(ac, osd->o_authorizer);
671
kfree(osd);
672
}
673
}
674
675
/*
676
* remove an osd from our map
677
*/
678
static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
679
{
680
dout("__remove_osd %p\n", osd);
681
BUG_ON(!list_empty(&osd->o_requests));
682
rb_erase(&osd->o_node, &osdc->osds);
683
list_del_init(&osd->o_osd_lru);
684
ceph_con_close(&osd->o_con);
685
put_osd(osd);
686
}
687
688
static void __move_osd_to_lru(struct ceph_osd_client *osdc,
689
struct ceph_osd *osd)
690
{
691
dout("__move_osd_to_lru %p\n", osd);
692
BUG_ON(!list_empty(&osd->o_osd_lru));
693
list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
694
osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
695
}
696
697
static void __remove_osd_from_lru(struct ceph_osd *osd)
698
{
699
dout("__remove_osd_from_lru %p\n", osd);
700
if (!list_empty(&osd->o_osd_lru))
701
list_del_init(&osd->o_osd_lru);
702
}
703
704
static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
705
{
706
struct ceph_osd *osd, *nosd;
707
708
dout("__remove_old_osds %p\n", osdc);
709
mutex_lock(&osdc->request_mutex);
710
list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
711
if (!remove_all && time_before(jiffies, osd->lru_ttl))
712
break;
713
__remove_osd(osdc, osd);
714
}
715
mutex_unlock(&osdc->request_mutex);
716
}
717
718
/*
719
* reset osd connect
720
*/
721
static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
722
{
723
struct ceph_osd_request *req;
724
int ret = 0;
725
726
dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
727
if (list_empty(&osd->o_requests) &&
728
list_empty(&osd->o_linger_requests)) {
729
__remove_osd(osdc, osd);
730
} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
731
&osd->o_con.peer_addr,
732
sizeof(osd->o_con.peer_addr)) == 0 &&
733
!ceph_con_opened(&osd->o_con)) {
734
dout(" osd addr hasn't changed and connection never opened,"
735
" letting msgr retry");
736
/* touch each r_stamp for handle_timeout()'s benfit */
737
list_for_each_entry(req, &osd->o_requests, r_osd_item)
738
req->r_stamp = jiffies;
739
ret = -EAGAIN;
740
} else {
741
ceph_con_close(&osd->o_con);
742
ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
743
osd->o_incarnation++;
744
}
745
return ret;
746
}
747
748
static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
749
{
750
struct rb_node **p = &osdc->osds.rb_node;
751
struct rb_node *parent = NULL;
752
struct ceph_osd *osd = NULL;
753
754
while (*p) {
755
parent = *p;
756
osd = rb_entry(parent, struct ceph_osd, o_node);
757
if (new->o_osd < osd->o_osd)
758
p = &(*p)->rb_left;
759
else if (new->o_osd > osd->o_osd)
760
p = &(*p)->rb_right;
761
else
762
BUG();
763
}
764
765
rb_link_node(&new->o_node, parent, p);
766
rb_insert_color(&new->o_node, &osdc->osds);
767
}
768
769
static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
770
{
771
struct ceph_osd *osd;
772
struct rb_node *n = osdc->osds.rb_node;
773
774
while (n) {
775
osd = rb_entry(n, struct ceph_osd, o_node);
776
if (o < osd->o_osd)
777
n = n->rb_left;
778
else if (o > osd->o_osd)
779
n = n->rb_right;
780
else
781
return osd;
782
}
783
return NULL;
784
}
785
786
static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
787
{
788
schedule_delayed_work(&osdc->timeout_work,
789
osdc->client->options->osd_keepalive_timeout * HZ);
790
}
791
792
static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
793
{
794
cancel_delayed_work(&osdc->timeout_work);
795
}
796
797
/*
798
* Register request, assign tid. If this is the first request, set up
799
* the timeout event.
800
*/
801
static void __register_request(struct ceph_osd_client *osdc,
802
struct ceph_osd_request *req)
803
{
804
req->r_tid = ++osdc->last_tid;
805
req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
806
INIT_LIST_HEAD(&req->r_req_lru_item);
807
808
dout("__register_request %p tid %lld\n", req, req->r_tid);
809
__insert_request(osdc, req);
810
ceph_osdc_get_request(req);
811
osdc->num_requests++;
812
813
if (osdc->num_requests == 1) {
814
dout(" first request, scheduling timeout\n");
815
__schedule_osd_timeout(osdc);
816
}
817
}
818
819
static void register_request(struct ceph_osd_client *osdc,
820
struct ceph_osd_request *req)
821
{
822
mutex_lock(&osdc->request_mutex);
823
__register_request(osdc, req);
824
mutex_unlock(&osdc->request_mutex);
825
}
826
827
/*
828
* called under osdc->request_mutex
829
*/
830
static void __unregister_request(struct ceph_osd_client *osdc,
831
struct ceph_osd_request *req)
832
{
833
dout("__unregister_request %p tid %lld\n", req, req->r_tid);
834
rb_erase(&req->r_node, &osdc->requests);
835
osdc->num_requests--;
836
837
if (req->r_osd) {
838
/* make sure the original request isn't in flight. */
839
ceph_con_revoke(&req->r_osd->o_con, req->r_request);
840
841
list_del_init(&req->r_osd_item);
842
if (list_empty(&req->r_osd->o_requests) &&
843
list_empty(&req->r_osd->o_linger_requests)) {
844
dout("moving osd to %p lru\n", req->r_osd);
845
__move_osd_to_lru(osdc, req->r_osd);
846
}
847
if (list_empty(&req->r_linger_item))
848
req->r_osd = NULL;
849
}
850
851
ceph_osdc_put_request(req);
852
853
list_del_init(&req->r_req_lru_item);
854
if (osdc->num_requests == 0) {
855
dout(" no requests, canceling timeout\n");
856
__cancel_osd_timeout(osdc);
857
}
858
}
859
860
/*
861
* Cancel a previously queued request message
862
*/
863
static void __cancel_request(struct ceph_osd_request *req)
864
{
865
if (req->r_sent && req->r_osd) {
866
ceph_con_revoke(&req->r_osd->o_con, req->r_request);
867
req->r_sent = 0;
868
}
869
}
870
871
static void __register_linger_request(struct ceph_osd_client *osdc,
872
struct ceph_osd_request *req)
873
{
874
dout("__register_linger_request %p\n", req);
875
list_add_tail(&req->r_linger_item, &osdc->req_linger);
876
list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
877
}
878
879
static void __unregister_linger_request(struct ceph_osd_client *osdc,
880
struct ceph_osd_request *req)
881
{
882
dout("__unregister_linger_request %p\n", req);
883
if (req->r_osd) {
884
list_del_init(&req->r_linger_item);
885
list_del_init(&req->r_linger_osd);
886
887
if (list_empty(&req->r_osd->o_requests) &&
888
list_empty(&req->r_osd->o_linger_requests)) {
889
dout("moving osd to %p lru\n", req->r_osd);
890
__move_osd_to_lru(osdc, req->r_osd);
891
}
892
if (list_empty(&req->r_osd_item))
893
req->r_osd = NULL;
894
}
895
}
896
897
void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
898
struct ceph_osd_request *req)
899
{
900
mutex_lock(&osdc->request_mutex);
901
if (req->r_linger) {
902
__unregister_linger_request(osdc, req);
903
ceph_osdc_put_request(req);
904
}
905
mutex_unlock(&osdc->request_mutex);
906
}
907
EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
908
909
void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
910
struct ceph_osd_request *req)
911
{
912
if (!req->r_linger) {
913
dout("set_request_linger %p\n", req);
914
req->r_linger = 1;
915
/*
916
* caller is now responsible for calling
917
* unregister_linger_request
918
*/
919
ceph_osdc_get_request(req);
920
}
921
}
922
EXPORT_SYMBOL(ceph_osdc_set_request_linger);
923
924
/*
925
* Pick an osd (the first 'up' osd in the pg), allocate the osd struct
926
* (as needed), and set the request r_osd appropriately. If there is
927
* no up osd, set r_osd to NULL. Move the request to the appropriate list
928
* (unsent, homeless) or leave on in-flight lru.
929
*
930
* Return 0 if unchanged, 1 if changed, or negative on error.
931
*
932
* Caller should hold map_sem for read and request_mutex.
933
*/
934
static int __map_request(struct ceph_osd_client *osdc,
935
struct ceph_osd_request *req)
936
{
937
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
938
struct ceph_pg pgid;
939
int acting[CEPH_PG_MAX_SIZE];
940
int o = -1, num = 0;
941
int err;
942
943
dout("map_request %p tid %lld\n", req, req->r_tid);
944
err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
945
&req->r_file_layout, osdc->osdmap);
946
if (err) {
947
list_move(&req->r_req_lru_item, &osdc->req_notarget);
948
return err;
949
}
950
pgid = reqhead->layout.ol_pgid;
951
req->r_pgid = pgid;
952
953
err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
954
if (err > 0) {
955
o = acting[0];
956
num = err;
957
}
958
959
if ((req->r_osd && req->r_osd->o_osd == o &&
960
req->r_sent >= req->r_osd->o_incarnation &&
961
req->r_num_pg_osds == num &&
962
memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
963
(req->r_osd == NULL && o == -1))
964
return 0; /* no change */
965
966
dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
967
req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
968
req->r_osd ? req->r_osd->o_osd : -1);
969
970
/* record full pg acting set */
971
memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
972
req->r_num_pg_osds = num;
973
974
if (req->r_osd) {
975
__cancel_request(req);
976
list_del_init(&req->r_osd_item);
977
req->r_osd = NULL;
978
}
979
980
req->r_osd = __lookup_osd(osdc, o);
981
if (!req->r_osd && o >= 0) {
982
err = -ENOMEM;
983
req->r_osd = create_osd(osdc);
984
if (!req->r_osd) {
985
list_move(&req->r_req_lru_item, &osdc->req_notarget);
986
goto out;
987
}
988
989
dout("map_request osd %p is osd%d\n", req->r_osd, o);
990
req->r_osd->o_osd = o;
991
req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
992
__insert_osd(osdc, req->r_osd);
993
994
ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
995
}
996
997
if (req->r_osd) {
998
__remove_osd_from_lru(req->r_osd);
999
list_add(&req->r_osd_item, &req->r_osd->o_requests);
1000
list_move(&req->r_req_lru_item, &osdc->req_unsent);
1001
} else {
1002
list_move(&req->r_req_lru_item, &osdc->req_notarget);
1003
}
1004
err = 1; /* osd or pg changed */
1005
1006
out:
1007
return err;
1008
}
1009
1010
/*
1011
* caller should hold map_sem (for read) and request_mutex
1012
*/
1013
static int __send_request(struct ceph_osd_client *osdc,
1014
struct ceph_osd_request *req)
1015
{
1016
struct ceph_osd_request_head *reqhead;
1017
1018
dout("send_request %p tid %llu to osd%d flags %d\n",
1019
req, req->r_tid, req->r_osd->o_osd, req->r_flags);
1020
1021
reqhead = req->r_request->front.iov_base;
1022
reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
1023
reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
1024
reqhead->reassert_version = req->r_reassert_version;
1025
1026
req->r_stamp = jiffies;
1027
list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1028
1029
ceph_msg_get(req->r_request); /* send consumes a ref */
1030
ceph_con_send(&req->r_osd->o_con, req->r_request);
1031
req->r_sent = req->r_osd->o_incarnation;
1032
return 0;
1033
}
1034
1035
/*
1036
* Send any requests in the queue (req_unsent).
1037
*/
1038
static void send_queued(struct ceph_osd_client *osdc)
1039
{
1040
struct ceph_osd_request *req, *tmp;
1041
1042
dout("send_queued\n");
1043
mutex_lock(&osdc->request_mutex);
1044
list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
1045
__send_request(osdc, req);
1046
}
1047
mutex_unlock(&osdc->request_mutex);
1048
}
1049
1050
/*
1051
* Timeout callback, called every N seconds when 1 or more osd
1052
* requests has been active for more than N seconds. When this
1053
* happens, we ping all OSDs with requests who have timed out to
1054
* ensure any communications channel reset is detected. Reset the
1055
* request timeouts another N seconds in the future as we go.
1056
* Reschedule the timeout event another N seconds in future (unless
1057
* there are no open requests).
1058
*/
1059
static void handle_timeout(struct work_struct *work)
1060
{
1061
struct ceph_osd_client *osdc =
1062
container_of(work, struct ceph_osd_client, timeout_work.work);
1063
struct ceph_osd_request *req, *last_req = NULL;
1064
struct ceph_osd *osd;
1065
unsigned long timeout = osdc->client->options->osd_timeout * HZ;
1066
unsigned long keepalive =
1067
osdc->client->options->osd_keepalive_timeout * HZ;
1068
unsigned long last_stamp = 0;
1069
struct list_head slow_osds;
1070
dout("timeout\n");
1071
down_read(&osdc->map_sem);
1072
1073
ceph_monc_request_next_osdmap(&osdc->client->monc);
1074
1075
mutex_lock(&osdc->request_mutex);
1076
1077
/*
1078
* reset osds that appear to be _really_ unresponsive. this
1079
* is a failsafe measure.. we really shouldn't be getting to
1080
* this point if the system is working properly. the monitors
1081
* should mark the osd as failed and we should find out about
1082
* it from an updated osd map.
1083
*/
1084
while (timeout && !list_empty(&osdc->req_lru)) {
1085
req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
1086
r_req_lru_item);
1087
1088
if (time_before(jiffies, req->r_stamp + timeout))
1089
break;
1090
1091
BUG_ON(req == last_req && req->r_stamp == last_stamp);
1092
last_req = req;
1093
last_stamp = req->r_stamp;
1094
1095
osd = req->r_osd;
1096
BUG_ON(!osd);
1097
pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
1098
req->r_tid, osd->o_osd);
1099
__kick_osd_requests(osdc, osd);
1100
}
1101
1102
/*
1103
* ping osds that are a bit slow. this ensures that if there
1104
* is a break in the TCP connection we will notice, and reopen
1105
* a connection with that osd (from the fault callback).
1106
*/
1107
INIT_LIST_HEAD(&slow_osds);
1108
list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
1109
if (time_before(jiffies, req->r_stamp + keepalive))
1110
break;
1111
1112
osd = req->r_osd;
1113
BUG_ON(!osd);
1114
dout(" tid %llu is slow, will send keepalive on osd%d\n",
1115
req->r_tid, osd->o_osd);
1116
list_move_tail(&osd->o_keepalive_item, &slow_osds);
1117
}
1118
while (!list_empty(&slow_osds)) {
1119
osd = list_entry(slow_osds.next, struct ceph_osd,
1120
o_keepalive_item);
1121
list_del_init(&osd->o_keepalive_item);
1122
ceph_con_keepalive(&osd->o_con);
1123
}
1124
1125
__schedule_osd_timeout(osdc);
1126
mutex_unlock(&osdc->request_mutex);
1127
send_queued(osdc);
1128
up_read(&osdc->map_sem);
1129
}
1130
1131
static void handle_osds_timeout(struct work_struct *work)
1132
{
1133
struct ceph_osd_client *osdc =
1134
container_of(work, struct ceph_osd_client,
1135
osds_timeout_work.work);
1136
unsigned long delay =
1137
osdc->client->options->osd_idle_ttl * HZ >> 2;
1138
1139
dout("osds timeout\n");
1140
down_read(&osdc->map_sem);
1141
remove_old_osds(osdc, 0);
1142
up_read(&osdc->map_sem);
1143
1144
schedule_delayed_work(&osdc->osds_timeout_work,
1145
round_jiffies_relative(delay));
1146
}
1147
1148
static void complete_request(struct ceph_osd_request *req)
1149
{
1150
if (req->r_safe_callback)
1151
req->r_safe_callback(req, NULL);
1152
complete_all(&req->r_safe_completion); /* fsync waiter */
1153
}
1154
1155
/*
1156
* handle osd op reply. either call the callback if it is specified,
1157
* or do the completion to wake up the waiting thread.
1158
*/
1159
static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1160
struct ceph_connection *con)
1161
{
1162
struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1163
struct ceph_osd_request *req;
1164
u64 tid;
1165
int numops, object_len, flags;
1166
s32 result;
1167
1168
tid = le64_to_cpu(msg->hdr.tid);
1169
if (msg->front.iov_len < sizeof(*rhead))
1170
goto bad;
1171
numops = le32_to_cpu(rhead->num_ops);
1172
object_len = le32_to_cpu(rhead->object_len);
1173
result = le32_to_cpu(rhead->result);
1174
if (msg->front.iov_len != sizeof(*rhead) + object_len +
1175
numops * sizeof(struct ceph_osd_op))
1176
goto bad;
1177
dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
1178
/* lookup */
1179
mutex_lock(&osdc->request_mutex);
1180
req = __lookup_request(osdc, tid);
1181
if (req == NULL) {
1182
dout("handle_reply tid %llu dne\n", tid);
1183
mutex_unlock(&osdc->request_mutex);
1184
return;
1185
}
1186
ceph_osdc_get_request(req);
1187
flags = le32_to_cpu(rhead->flags);
1188
1189
/*
1190
* if this connection filled our message, drop our reference now, to
1191
* avoid a (safe but slower) revoke later.
1192
*/
1193
if (req->r_con_filling_msg == con && req->r_reply == msg) {
1194
dout(" dropping con_filling_msg ref %p\n", con);
1195
req->r_con_filling_msg = NULL;
1196
ceph_con_put(con);
1197
}
1198
1199
if (!req->r_got_reply) {
1200
unsigned bytes;
1201
1202
req->r_result = le32_to_cpu(rhead->result);
1203
bytes = le32_to_cpu(msg->hdr.data_len);
1204
dout("handle_reply result %d bytes %d\n", req->r_result,
1205
bytes);
1206
if (req->r_result == 0)
1207
req->r_result = bytes;
1208
1209
/* in case this is a write and we need to replay, */
1210
req->r_reassert_version = rhead->reassert_version;
1211
1212
req->r_got_reply = 1;
1213
} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1214
dout("handle_reply tid %llu dup ack\n", tid);
1215
mutex_unlock(&osdc->request_mutex);
1216
goto done;
1217
}
1218
1219
dout("handle_reply tid %llu flags %d\n", tid, flags);
1220
1221
if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1222
__register_linger_request(osdc, req);
1223
1224
/* either this is a read, or we got the safe response */
1225
if (result < 0 ||
1226
(flags & CEPH_OSD_FLAG_ONDISK) ||
1227
((flags & CEPH_OSD_FLAG_WRITE) == 0))
1228
__unregister_request(osdc, req);
1229
1230
mutex_unlock(&osdc->request_mutex);
1231
1232
if (req->r_callback)
1233
req->r_callback(req, msg);
1234
else
1235
complete_all(&req->r_completion);
1236
1237
if (flags & CEPH_OSD_FLAG_ONDISK)
1238
complete_request(req);
1239
1240
done:
1241
dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1242
ceph_osdc_put_request(req);
1243
return;
1244
1245
bad:
1246
pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1247
(int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1248
(int)sizeof(*rhead));
1249
ceph_msg_dump(msg);
1250
}
1251
1252
static void reset_changed_osds(struct ceph_osd_client *osdc)
1253
{
1254
struct rb_node *p, *n;
1255
1256
for (p = rb_first(&osdc->osds); p; p = n) {
1257
struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1258
1259
n = rb_next(p);
1260
if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1261
memcmp(&osd->o_con.peer_addr,
1262
ceph_osd_addr(osdc->osdmap,
1263
osd->o_osd),
1264
sizeof(struct ceph_entity_addr)) != 0)
1265
__reset_osd(osdc, osd);
1266
}
1267
}
1268
1269
/*
1270
* Requeue requests whose mapping to an OSD has changed. If requests map to
1271
* no osd, request a new map.
1272
*
1273
* Caller should hold map_sem for read and request_mutex.
1274
*/
1275
static void kick_requests(struct ceph_osd_client *osdc)
1276
{
1277
struct ceph_osd_request *req, *nreq;
1278
struct rb_node *p;
1279
int needmap = 0;
1280
int err;
1281
1282
dout("kick_requests\n");
1283
mutex_lock(&osdc->request_mutex);
1284
for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1285
req = rb_entry(p, struct ceph_osd_request, r_node);
1286
err = __map_request(osdc, req);
1287
if (err < 0)
1288
continue; /* error */
1289
if (req->r_osd == NULL) {
1290
dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1291
needmap++; /* request a newer map */
1292
} else if (err > 0) {
1293
dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
1294
req->r_osd ? req->r_osd->o_osd : -1);
1295
if (!req->r_linger)
1296
req->r_flags |= CEPH_OSD_FLAG_RETRY;
1297
}
1298
}
1299
1300
list_for_each_entry_safe(req, nreq, &osdc->req_linger,
1301
r_linger_item) {
1302
dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1303
1304
err = __map_request(osdc, req);
1305
if (err == 0)
1306
continue; /* no change and no osd was specified */
1307
if (err < 0)
1308
continue; /* hrm! */
1309
if (req->r_osd == NULL) {
1310
dout("tid %llu maps to no valid osd\n", req->r_tid);
1311
needmap++; /* request a newer map */
1312
continue;
1313
}
1314
1315
dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1316
req->r_osd ? req->r_osd->o_osd : -1);
1317
__unregister_linger_request(osdc, req);
1318
__register_request(osdc, req);
1319
}
1320
mutex_unlock(&osdc->request_mutex);
1321
1322
if (needmap) {
1323
dout("%d requests for down osds, need new map\n", needmap);
1324
ceph_monc_request_next_osdmap(&osdc->client->monc);
1325
}
1326
}
1327
1328
1329
/*
1330
* Process updated osd map.
1331
*
1332
* The message contains any number of incremental and full maps, normally
1333
* indicating some sort of topology change in the cluster. Kick requests
1334
* off to different OSDs as needed.
1335
*/
1336
void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1337
{
1338
void *p, *end, *next;
1339
u32 nr_maps, maplen;
1340
u32 epoch;
1341
struct ceph_osdmap *newmap = NULL, *oldmap;
1342
int err;
1343
struct ceph_fsid fsid;
1344
1345
dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1346
p = msg->front.iov_base;
1347
end = p + msg->front.iov_len;
1348
1349
/* verify fsid */
1350
ceph_decode_need(&p, end, sizeof(fsid), bad);
1351
ceph_decode_copy(&p, &fsid, sizeof(fsid));
1352
if (ceph_check_fsid(osdc->client, &fsid) < 0)
1353
return;
1354
1355
down_write(&osdc->map_sem);
1356
1357
/* incremental maps */
1358
ceph_decode_32_safe(&p, end, nr_maps, bad);
1359
dout(" %d inc maps\n", nr_maps);
1360
while (nr_maps > 0) {
1361
ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1362
epoch = ceph_decode_32(&p);
1363
maplen = ceph_decode_32(&p);
1364
ceph_decode_need(&p, end, maplen, bad);
1365
next = p + maplen;
1366
if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1367
dout("applying incremental map %u len %d\n",
1368
epoch, maplen);
1369
newmap = osdmap_apply_incremental(&p, next,
1370
osdc->osdmap,
1371
osdc->client->msgr);
1372
if (IS_ERR(newmap)) {
1373
err = PTR_ERR(newmap);
1374
goto bad;
1375
}
1376
BUG_ON(!newmap);
1377
if (newmap != osdc->osdmap) {
1378
ceph_osdmap_destroy(osdc->osdmap);
1379
osdc->osdmap = newmap;
1380
}
1381
kick_requests(osdc);
1382
reset_changed_osds(osdc);
1383
} else {
1384
dout("ignoring incremental map %u len %d\n",
1385
epoch, maplen);
1386
}
1387
p = next;
1388
nr_maps--;
1389
}
1390
if (newmap)
1391
goto done;
1392
1393
/* full maps */
1394
ceph_decode_32_safe(&p, end, nr_maps, bad);
1395
dout(" %d full maps\n", nr_maps);
1396
while (nr_maps) {
1397
ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1398
epoch = ceph_decode_32(&p);
1399
maplen = ceph_decode_32(&p);
1400
ceph_decode_need(&p, end, maplen, bad);
1401
if (nr_maps > 1) {
1402
dout("skipping non-latest full map %u len %d\n",
1403
epoch, maplen);
1404
} else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1405
dout("skipping full map %u len %d, "
1406
"older than our %u\n", epoch, maplen,
1407
osdc->osdmap->epoch);
1408
} else {
1409
dout("taking full map %u len %d\n", epoch, maplen);
1410
newmap = osdmap_decode(&p, p+maplen);
1411
if (IS_ERR(newmap)) {
1412
err = PTR_ERR(newmap);
1413
goto bad;
1414
}
1415
BUG_ON(!newmap);
1416
oldmap = osdc->osdmap;
1417
osdc->osdmap = newmap;
1418
if (oldmap)
1419
ceph_osdmap_destroy(oldmap);
1420
kick_requests(osdc);
1421
}
1422
p += maplen;
1423
nr_maps--;
1424
}
1425
1426
done:
1427
downgrade_write(&osdc->map_sem);
1428
ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1429
1430
/*
1431
* subscribe to subsequent osdmap updates if full to ensure
1432
* we find out when we are no longer full and stop returning
1433
* ENOSPC.
1434
*/
1435
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1436
ceph_monc_request_next_osdmap(&osdc->client->monc);
1437
1438
send_queued(osdc);
1439
up_read(&osdc->map_sem);
1440
wake_up_all(&osdc->client->auth_wq);
1441
return;
1442
1443
bad:
1444
pr_err("osdc handle_map corrupt msg\n");
1445
ceph_msg_dump(msg);
1446
up_write(&osdc->map_sem);
1447
return;
1448
}
1449
1450
/*
1451
* watch/notify callback event infrastructure
1452
*
1453
* These callbacks are used both for watch and notify operations.
1454
*/
1455
static void __release_event(struct kref *kref)
1456
{
1457
struct ceph_osd_event *event =
1458
container_of(kref, struct ceph_osd_event, kref);
1459
1460
dout("__release_event %p\n", event);
1461
kfree(event);
1462
}
1463
1464
static void get_event(struct ceph_osd_event *event)
1465
{
1466
kref_get(&event->kref);
1467
}
1468
1469
void ceph_osdc_put_event(struct ceph_osd_event *event)
1470
{
1471
kref_put(&event->kref, __release_event);
1472
}
1473
EXPORT_SYMBOL(ceph_osdc_put_event);
1474
1475
static void __insert_event(struct ceph_osd_client *osdc,
1476
struct ceph_osd_event *new)
1477
{
1478
struct rb_node **p = &osdc->event_tree.rb_node;
1479
struct rb_node *parent = NULL;
1480
struct ceph_osd_event *event = NULL;
1481
1482
while (*p) {
1483
parent = *p;
1484
event = rb_entry(parent, struct ceph_osd_event, node);
1485
if (new->cookie < event->cookie)
1486
p = &(*p)->rb_left;
1487
else if (new->cookie > event->cookie)
1488
p = &(*p)->rb_right;
1489
else
1490
BUG();
1491
}
1492
1493
rb_link_node(&new->node, parent, p);
1494
rb_insert_color(&new->node, &osdc->event_tree);
1495
}
1496
1497
static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
1498
u64 cookie)
1499
{
1500
struct rb_node **p = &osdc->event_tree.rb_node;
1501
struct rb_node *parent = NULL;
1502
struct ceph_osd_event *event = NULL;
1503
1504
while (*p) {
1505
parent = *p;
1506
event = rb_entry(parent, struct ceph_osd_event, node);
1507
if (cookie < event->cookie)
1508
p = &(*p)->rb_left;
1509
else if (cookie > event->cookie)
1510
p = &(*p)->rb_right;
1511
else
1512
return event;
1513
}
1514
return NULL;
1515
}
1516
1517
static void __remove_event(struct ceph_osd_event *event)
1518
{
1519
struct ceph_osd_client *osdc = event->osdc;
1520
1521
if (!RB_EMPTY_NODE(&event->node)) {
1522
dout("__remove_event removed %p\n", event);
1523
rb_erase(&event->node, &osdc->event_tree);
1524
ceph_osdc_put_event(event);
1525
} else {
1526
dout("__remove_event didn't remove %p\n", event);
1527
}
1528
}
1529
1530
int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1531
void (*event_cb)(u64, u64, u8, void *),
1532
int one_shot, void *data,
1533
struct ceph_osd_event **pevent)
1534
{
1535
struct ceph_osd_event *event;
1536
1537
event = kmalloc(sizeof(*event), GFP_NOIO);
1538
if (!event)
1539
return -ENOMEM;
1540
1541
dout("create_event %p\n", event);
1542
event->cb = event_cb;
1543
event->one_shot = one_shot;
1544
event->data = data;
1545
event->osdc = osdc;
1546
INIT_LIST_HEAD(&event->osd_node);
1547
kref_init(&event->kref); /* one ref for us */
1548
kref_get(&event->kref); /* one ref for the caller */
1549
init_completion(&event->completion);
1550
1551
spin_lock(&osdc->event_lock);
1552
event->cookie = ++osdc->event_count;
1553
__insert_event(osdc, event);
1554
spin_unlock(&osdc->event_lock);
1555
1556
*pevent = event;
1557
return 0;
1558
}
1559
EXPORT_SYMBOL(ceph_osdc_create_event);
1560
1561
void ceph_osdc_cancel_event(struct ceph_osd_event *event)
1562
{
1563
struct ceph_osd_client *osdc = event->osdc;
1564
1565
dout("cancel_event %p\n", event);
1566
spin_lock(&osdc->event_lock);
1567
__remove_event(event);
1568
spin_unlock(&osdc->event_lock);
1569
ceph_osdc_put_event(event); /* caller's */
1570
}
1571
EXPORT_SYMBOL(ceph_osdc_cancel_event);
1572
1573
1574
static void do_event_work(struct work_struct *work)
1575
{
1576
struct ceph_osd_event_work *event_work =
1577
container_of(work, struct ceph_osd_event_work, work);
1578
struct ceph_osd_event *event = event_work->event;
1579
u64 ver = event_work->ver;
1580
u64 notify_id = event_work->notify_id;
1581
u8 opcode = event_work->opcode;
1582
1583
dout("do_event_work completing %p\n", event);
1584
event->cb(ver, notify_id, opcode, event->data);
1585
complete(&event->completion);
1586
dout("do_event_work completed %p\n", event);
1587
ceph_osdc_put_event(event);
1588
kfree(event_work);
1589
}
1590
1591
1592
/*
1593
* Process osd watch notifications
1594
*/
1595
void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1596
{
1597
void *p, *end;
1598
u8 proto_ver;
1599
u64 cookie, ver, notify_id;
1600
u8 opcode;
1601
struct ceph_osd_event *event;
1602
struct ceph_osd_event_work *event_work;
1603
1604
p = msg->front.iov_base;
1605
end = p + msg->front.iov_len;
1606
1607
ceph_decode_8_safe(&p, end, proto_ver, bad);
1608
ceph_decode_8_safe(&p, end, opcode, bad);
1609
ceph_decode_64_safe(&p, end, cookie, bad);
1610
ceph_decode_64_safe(&p, end, ver, bad);
1611
ceph_decode_64_safe(&p, end, notify_id, bad);
1612
1613
spin_lock(&osdc->event_lock);
1614
event = __find_event(osdc, cookie);
1615
if (event) {
1616
get_event(event);
1617
if (event->one_shot)
1618
__remove_event(event);
1619
}
1620
spin_unlock(&osdc->event_lock);
1621
dout("handle_watch_notify cookie %lld ver %lld event %p\n",
1622
cookie, ver, event);
1623
if (event) {
1624
event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
1625
if (!event_work) {
1626
dout("ERROR: could not allocate event_work\n");
1627
goto done_err;
1628
}
1629
INIT_WORK(&event_work->work, do_event_work);
1630
event_work->event = event;
1631
event_work->ver = ver;
1632
event_work->notify_id = notify_id;
1633
event_work->opcode = opcode;
1634
if (!queue_work(osdc->notify_wq, &event_work->work)) {
1635
dout("WARNING: failed to queue notify event work\n");
1636
goto done_err;
1637
}
1638
}
1639
1640
return;
1641
1642
done_err:
1643
complete(&event->completion);
1644
ceph_osdc_put_event(event);
1645
return;
1646
1647
bad:
1648
pr_err("osdc handle_watch_notify corrupt msg\n");
1649
return;
1650
}
1651
1652
int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
1653
{
1654
int err;
1655
1656
dout("wait_event %p\n", event);
1657
err = wait_for_completion_interruptible_timeout(&event->completion,
1658
timeout * HZ);
1659
ceph_osdc_put_event(event);
1660
if (err > 0)
1661
err = 0;
1662
dout("wait_event %p returns %d\n", event, err);
1663
return err;
1664
}
1665
EXPORT_SYMBOL(ceph_osdc_wait_event);
1666
1667
/*
1668
* Register request, send initial attempt.
1669
*/
1670
int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1671
struct ceph_osd_request *req,
1672
bool nofail)
1673
{
1674
int rc = 0;
1675
1676
req->r_request->pages = req->r_pages;
1677
req->r_request->nr_pages = req->r_num_pages;
1678
#ifdef CONFIG_BLOCK
1679
req->r_request->bio = req->r_bio;
1680
#endif
1681
req->r_request->trail = req->r_trail;
1682
1683
register_request(osdc, req);
1684
1685
down_read(&osdc->map_sem);
1686
mutex_lock(&osdc->request_mutex);
1687
/*
1688
* a racing kick_requests() may have sent the message for us
1689
* while we dropped request_mutex above, so only send now if
1690
* the request still han't been touched yet.
1691
*/
1692
if (req->r_sent == 0) {
1693
rc = __map_request(osdc, req);
1694
if (rc < 0) {
1695
if (nofail) {
1696
dout("osdc_start_request failed map, "
1697
" will retry %lld\n", req->r_tid);
1698
rc = 0;
1699
}
1700
goto out_unlock;
1701
}
1702
if (req->r_osd == NULL) {
1703
dout("send_request %p no up osds in pg\n", req);
1704
ceph_monc_request_next_osdmap(&osdc->client->monc);
1705
} else {
1706
rc = __send_request(osdc, req);
1707
if (rc) {
1708
if (nofail) {
1709
dout("osdc_start_request failed send, "
1710
" will retry %lld\n", req->r_tid);
1711
rc = 0;
1712
} else {
1713
__unregister_request(osdc, req);
1714
}
1715
}
1716
}
1717
}
1718
1719
out_unlock:
1720
mutex_unlock(&osdc->request_mutex);
1721
up_read(&osdc->map_sem);
1722
return rc;
1723
}
1724
EXPORT_SYMBOL(ceph_osdc_start_request);
1725
1726
/*
1727
* wait for a request to complete
1728
*/
1729
int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1730
struct ceph_osd_request *req)
1731
{
1732
int rc;
1733
1734
rc = wait_for_completion_interruptible(&req->r_completion);
1735
if (rc < 0) {
1736
mutex_lock(&osdc->request_mutex);
1737
__cancel_request(req);
1738
__unregister_request(osdc, req);
1739
mutex_unlock(&osdc->request_mutex);
1740
complete_request(req);
1741
dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1742
return rc;
1743
}
1744
1745
dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1746
return req->r_result;
1747
}
1748
EXPORT_SYMBOL(ceph_osdc_wait_request);
1749
1750
/*
1751
* sync - wait for all in-flight requests to flush. avoid starvation.
1752
*/
1753
void ceph_osdc_sync(struct ceph_osd_client *osdc)
1754
{
1755
struct ceph_osd_request *req;
1756
u64 last_tid, next_tid = 0;
1757
1758
mutex_lock(&osdc->request_mutex);
1759
last_tid = osdc->last_tid;
1760
while (1) {
1761
req = __lookup_request_ge(osdc, next_tid);
1762
if (!req)
1763
break;
1764
if (req->r_tid > last_tid)
1765
break;
1766
1767
next_tid = req->r_tid + 1;
1768
if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1769
continue;
1770
1771
ceph_osdc_get_request(req);
1772
mutex_unlock(&osdc->request_mutex);
1773
dout("sync waiting on tid %llu (last is %llu)\n",
1774
req->r_tid, last_tid);
1775
wait_for_completion(&req->r_safe_completion);
1776
mutex_lock(&osdc->request_mutex);
1777
ceph_osdc_put_request(req);
1778
}
1779
mutex_unlock(&osdc->request_mutex);
1780
dout("sync done (thru tid %llu)\n", last_tid);
1781
}
1782
EXPORT_SYMBOL(ceph_osdc_sync);
1783
1784
/*
1785
* init, shutdown
1786
*/
1787
int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1788
{
1789
int err;
1790
1791
dout("init\n");
1792
osdc->client = client;
1793
osdc->osdmap = NULL;
1794
init_rwsem(&osdc->map_sem);
1795
init_completion(&osdc->map_waiters);
1796
osdc->last_requested_map = 0;
1797
mutex_init(&osdc->request_mutex);
1798
osdc->last_tid = 0;
1799
osdc->osds = RB_ROOT;
1800
INIT_LIST_HEAD(&osdc->osd_lru);
1801
osdc->requests = RB_ROOT;
1802
INIT_LIST_HEAD(&osdc->req_lru);
1803
INIT_LIST_HEAD(&osdc->req_unsent);
1804
INIT_LIST_HEAD(&osdc->req_notarget);
1805
INIT_LIST_HEAD(&osdc->req_linger);
1806
osdc->num_requests = 0;
1807
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1808
INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1809
spin_lock_init(&osdc->event_lock);
1810
osdc->event_tree = RB_ROOT;
1811
osdc->event_count = 0;
1812
1813
schedule_delayed_work(&osdc->osds_timeout_work,
1814
round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
1815
1816
err = -ENOMEM;
1817
osdc->req_mempool = mempool_create_kmalloc_pool(10,
1818
sizeof(struct ceph_osd_request));
1819
if (!osdc->req_mempool)
1820
goto out;
1821
1822
err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1823
"osd_op");
1824
if (err < 0)
1825
goto out_mempool;
1826
err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1827
OSD_OPREPLY_FRONT_LEN, 10, true,
1828
"osd_op_reply");
1829
if (err < 0)
1830
goto out_msgpool;
1831
1832
osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
1833
if (IS_ERR(osdc->notify_wq)) {
1834
err = PTR_ERR(osdc->notify_wq);
1835
osdc->notify_wq = NULL;
1836
goto out_msgpool;
1837
}
1838
return 0;
1839
1840
out_msgpool:
1841
ceph_msgpool_destroy(&osdc->msgpool_op);
1842
out_mempool:
1843
mempool_destroy(osdc->req_mempool);
1844
out:
1845
return err;
1846
}
1847
EXPORT_SYMBOL(ceph_osdc_init);
1848
1849
void ceph_osdc_stop(struct ceph_osd_client *osdc)
1850
{
1851
flush_workqueue(osdc->notify_wq);
1852
destroy_workqueue(osdc->notify_wq);
1853
cancel_delayed_work_sync(&osdc->timeout_work);
1854
cancel_delayed_work_sync(&osdc->osds_timeout_work);
1855
if (osdc->osdmap) {
1856
ceph_osdmap_destroy(osdc->osdmap);
1857
osdc->osdmap = NULL;
1858
}
1859
remove_old_osds(osdc, 1);
1860
WARN_ON(!RB_EMPTY_ROOT(&osdc->osds));
1861
mempool_destroy(osdc->req_mempool);
1862
ceph_msgpool_destroy(&osdc->msgpool_op);
1863
ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1864
}
1865
EXPORT_SYMBOL(ceph_osdc_stop);
1866
1867
/*
1868
* Read some contiguous pages. If we cross a stripe boundary, shorten
1869
* *plen. Return number of bytes read, or error.
1870
*/
1871
int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1872
struct ceph_vino vino, struct ceph_file_layout *layout,
1873
u64 off, u64 *plen,
1874
u32 truncate_seq, u64 truncate_size,
1875
struct page **pages, int num_pages, int page_align)
1876
{
1877
struct ceph_osd_request *req;
1878
int rc = 0;
1879
1880
dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1881
vino.snap, off, *plen);
1882
req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1883
CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1884
NULL, 0, truncate_seq, truncate_size, NULL,
1885
false, 1, page_align);
1886
if (!req)
1887
return -ENOMEM;
1888
1889
/* it may be a short read due to an object boundary */
1890
req->r_pages = pages;
1891
1892
dout("readpages final extent is %llu~%llu (%d pages align %d)\n",
1893
off, *plen, req->r_num_pages, page_align);
1894
1895
rc = ceph_osdc_start_request(osdc, req, false);
1896
if (!rc)
1897
rc = ceph_osdc_wait_request(osdc, req);
1898
1899
ceph_osdc_put_request(req);
1900
dout("readpages result %d\n", rc);
1901
return rc;
1902
}
1903
EXPORT_SYMBOL(ceph_osdc_readpages);
1904
1905
/*
1906
* do a synchronous write on N pages
1907
*/
1908
int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1909
struct ceph_file_layout *layout,
1910
struct ceph_snap_context *snapc,
1911
u64 off, u64 len,
1912
u32 truncate_seq, u64 truncate_size,
1913
struct timespec *mtime,
1914
struct page **pages, int num_pages,
1915
int flags, int do_sync, bool nofail)
1916
{
1917
struct ceph_osd_request *req;
1918
int rc = 0;
1919
int page_align = off & ~PAGE_MASK;
1920
1921
BUG_ON(vino.snap != CEPH_NOSNAP);
1922
req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1923
CEPH_OSD_OP_WRITE,
1924
flags | CEPH_OSD_FLAG_ONDISK |
1925
CEPH_OSD_FLAG_WRITE,
1926
snapc, do_sync,
1927
truncate_seq, truncate_size, mtime,
1928
nofail, 1, page_align);
1929
if (!req)
1930
return -ENOMEM;
1931
1932
/* it may be a short write due to an object boundary */
1933
req->r_pages = pages;
1934
dout("writepages %llu~%llu (%d pages)\n", off, len,
1935
req->r_num_pages);
1936
1937
rc = ceph_osdc_start_request(osdc, req, nofail);
1938
if (!rc)
1939
rc = ceph_osdc_wait_request(osdc, req);
1940
1941
ceph_osdc_put_request(req);
1942
if (rc == 0)
1943
rc = len;
1944
dout("writepages result %d\n", rc);
1945
return rc;
1946
}
1947
EXPORT_SYMBOL(ceph_osdc_writepages);
1948
1949
/*
1950
* handle incoming message
1951
*/
1952
static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1953
{
1954
struct ceph_osd *osd = con->private;
1955
struct ceph_osd_client *osdc;
1956
int type = le16_to_cpu(msg->hdr.type);
1957
1958
if (!osd)
1959
goto out;
1960
osdc = osd->o_osdc;
1961
1962
switch (type) {
1963
case CEPH_MSG_OSD_MAP:
1964
ceph_osdc_handle_map(osdc, msg);
1965
break;
1966
case CEPH_MSG_OSD_OPREPLY:
1967
handle_reply(osdc, msg, con);
1968
break;
1969
case CEPH_MSG_WATCH_NOTIFY:
1970
handle_watch_notify(osdc, msg);
1971
break;
1972
1973
default:
1974
pr_err("received unknown message type %d %s\n", type,
1975
ceph_msg_type_name(type));
1976
}
1977
out:
1978
ceph_msg_put(msg);
1979
}
1980
1981
/*
1982
* lookup and return message for incoming reply. set up reply message
1983
* pages.
1984
*/
1985
static struct ceph_msg *get_reply(struct ceph_connection *con,
1986
struct ceph_msg_header *hdr,
1987
int *skip)
1988
{
1989
struct ceph_osd *osd = con->private;
1990
struct ceph_osd_client *osdc = osd->o_osdc;
1991
struct ceph_msg *m;
1992
struct ceph_osd_request *req;
1993
int front = le32_to_cpu(hdr->front_len);
1994
int data_len = le32_to_cpu(hdr->data_len);
1995
u64 tid;
1996
1997
tid = le64_to_cpu(hdr->tid);
1998
mutex_lock(&osdc->request_mutex);
1999
req = __lookup_request(osdc, tid);
2000
if (!req) {
2001
*skip = 1;
2002
m = NULL;
2003
pr_info("get_reply unknown tid %llu from osd%d\n", tid,
2004
osd->o_osd);
2005
goto out;
2006
}
2007
2008
if (req->r_con_filling_msg) {
2009
dout("get_reply revoking msg %p from old con %p\n",
2010
req->r_reply, req->r_con_filling_msg);
2011
ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
2012
ceph_con_put(req->r_con_filling_msg);
2013
req->r_con_filling_msg = NULL;
2014
}
2015
2016
if (front > req->r_reply->front.iov_len) {
2017
pr_warning("get_reply front %d > preallocated %d\n",
2018
front, (int)req->r_reply->front.iov_len);
2019
m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
2020
if (!m)
2021
goto out;
2022
ceph_msg_put(req->r_reply);
2023
req->r_reply = m;
2024
}
2025
m = ceph_msg_get(req->r_reply);
2026
2027
if (data_len > 0) {
2028
int want = calc_pages_for(req->r_page_alignment, data_len);
2029
2030
if (unlikely(req->r_num_pages < want)) {
2031
pr_warning("tid %lld reply has %d bytes %d pages, we"
2032
" had only %d pages ready\n", tid, data_len,
2033
want, req->r_num_pages);
2034
*skip = 1;
2035
ceph_msg_put(m);
2036
m = NULL;
2037
goto out;
2038
}
2039
m->pages = req->r_pages;
2040
m->nr_pages = req->r_num_pages;
2041
m->page_alignment = req->r_page_alignment;
2042
#ifdef CONFIG_BLOCK
2043
m->bio = req->r_bio;
2044
#endif
2045
}
2046
*skip = 0;
2047
req->r_con_filling_msg = ceph_con_get(con);
2048
dout("get_reply tid %lld %p\n", tid, m);
2049
2050
out:
2051
mutex_unlock(&osdc->request_mutex);
2052
return m;
2053
2054
}
2055
2056
static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2057
struct ceph_msg_header *hdr,
2058
int *skip)
2059
{
2060
struct ceph_osd *osd = con->private;
2061
int type = le16_to_cpu(hdr->type);
2062
int front = le32_to_cpu(hdr->front_len);
2063
2064
switch (type) {
2065
case CEPH_MSG_OSD_MAP:
2066
case CEPH_MSG_WATCH_NOTIFY:
2067
return ceph_msg_new(type, front, GFP_NOFS);
2068
case CEPH_MSG_OSD_OPREPLY:
2069
return get_reply(con, hdr, skip);
2070
default:
2071
pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
2072
osd->o_osd);
2073
*skip = 1;
2074
return NULL;
2075
}
2076
}
2077
2078
/*
2079
* Wrappers to refcount containing ceph_osd struct
2080
*/
2081
static struct ceph_connection *get_osd_con(struct ceph_connection *con)
2082
{
2083
struct ceph_osd *osd = con->private;
2084
if (get_osd(osd))
2085
return con;
2086
return NULL;
2087
}
2088
2089
static void put_osd_con(struct ceph_connection *con)
2090
{
2091
struct ceph_osd *osd = con->private;
2092
put_osd(osd);
2093
}
2094
2095
/*
2096
* authentication
2097
*/
2098
static int get_authorizer(struct ceph_connection *con,
2099
void **buf, int *len, int *proto,
2100
void **reply_buf, int *reply_len, int force_new)
2101
{
2102
struct ceph_osd *o = con->private;
2103
struct ceph_osd_client *osdc = o->o_osdc;
2104
struct ceph_auth_client *ac = osdc->client->monc.auth;
2105
int ret = 0;
2106
2107
if (force_new && o->o_authorizer) {
2108
ac->ops->destroy_authorizer(ac, o->o_authorizer);
2109
o->o_authorizer = NULL;
2110
}
2111
if (o->o_authorizer == NULL) {
2112
ret = ac->ops->create_authorizer(
2113
ac, CEPH_ENTITY_TYPE_OSD,
2114
&o->o_authorizer,
2115
&o->o_authorizer_buf,
2116
&o->o_authorizer_buf_len,
2117
&o->o_authorizer_reply_buf,
2118
&o->o_authorizer_reply_buf_len);
2119
if (ret)
2120
return ret;
2121
}
2122
2123
*proto = ac->protocol;
2124
*buf = o->o_authorizer_buf;
2125
*len = o->o_authorizer_buf_len;
2126
*reply_buf = o->o_authorizer_reply_buf;
2127
*reply_len = o->o_authorizer_reply_buf_len;
2128
return 0;
2129
}
2130
2131
2132
static int verify_authorizer_reply(struct ceph_connection *con, int len)
2133
{
2134
struct ceph_osd *o = con->private;
2135
struct ceph_osd_client *osdc = o->o_osdc;
2136
struct ceph_auth_client *ac = osdc->client->monc.auth;
2137
2138
return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
2139
}
2140
2141
static int invalidate_authorizer(struct ceph_connection *con)
2142
{
2143
struct ceph_osd *o = con->private;
2144
struct ceph_osd_client *osdc = o->o_osdc;
2145
struct ceph_auth_client *ac = osdc->client->monc.auth;
2146
2147
if (ac->ops->invalidate_authorizer)
2148
ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2149
2150
return ceph_monc_validate_auth(&osdc->client->monc);
2151
}
2152
2153
static const struct ceph_connection_operations osd_con_ops = {
2154
.get = get_osd_con,
2155
.put = put_osd_con,
2156
.dispatch = dispatch,
2157
.get_authorizer = get_authorizer,
2158
.verify_authorizer_reply = verify_authorizer_reply,
2159
.invalidate_authorizer = invalidate_authorizer,
2160
.alloc_msg = alloc_msg,
2161
.fault = osd_reset,
2162
};
2163
2164