Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/net/ceph/osd_client.c
49348 views
1
// SPDX-License-Identifier: GPL-2.0
2
3
#include <linux/ceph/ceph_debug.h>
4
5
#include <linux/module.h>
6
#include <linux/err.h>
7
#include <linux/highmem.h>
8
#include <linux/mm.h>
9
#include <linux/pagemap.h>
10
#include <linux/slab.h>
11
#include <linux/uaccess.h>
12
#ifdef CONFIG_BLOCK
13
#include <linux/bio.h>
14
#endif
15
16
#include <linux/ceph/ceph_features.h>
17
#include <linux/ceph/libceph.h>
18
#include <linux/ceph/osd_client.h>
19
#include <linux/ceph/messenger.h>
20
#include <linux/ceph/decode.h>
21
#include <linux/ceph/auth.h>
22
#include <linux/ceph/pagelist.h>
23
#include <linux/ceph/striper.h>
24
25
#define OSD_OPREPLY_FRONT_LEN 512
26
27
static struct kmem_cache *ceph_osd_request_cache;
28
29
static const struct ceph_connection_operations osd_con_ops;
30
31
/*
32
* Implement client access to distributed object storage cluster.
33
*
34
* All data objects are stored within a cluster/cloud of OSDs, or
35
* "object storage devices." (Note that Ceph OSDs have _nothing_ to
36
* do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
37
* remote daemons serving up and coordinating consistent and safe
38
* access to storage.
39
*
40
* Cluster membership and the mapping of data objects onto storage devices
41
* are described by the osd map.
42
*
43
* We keep track of pending OSD requests (read, write), resubmit
44
* requests to different OSDs when the cluster topology/data layout
45
* change, or retry the affected requests when the communications
46
* channel with an OSD is reset.
47
*/
48
49
static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
50
static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
51
static void link_linger(struct ceph_osd *osd,
52
struct ceph_osd_linger_request *lreq);
53
static void unlink_linger(struct ceph_osd *osd,
54
struct ceph_osd_linger_request *lreq);
55
static void clear_backoffs(struct ceph_osd *osd);
56
57
#if 1
58
static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
59
{
60
bool wrlocked = true;
61
62
if (unlikely(down_read_trylock(sem))) {
63
wrlocked = false;
64
up_read(sem);
65
}
66
67
return wrlocked;
68
}
69
static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
70
{
71
WARN_ON(!rwsem_is_locked(&osdc->lock));
72
}
73
static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
74
{
75
WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
76
}
77
static inline void verify_osd_locked(struct ceph_osd *osd)
78
{
79
struct ceph_osd_client *osdc = osd->o_osdc;
80
81
WARN_ON(!(mutex_is_locked(&osd->lock) &&
82
rwsem_is_locked(&osdc->lock)) &&
83
!rwsem_is_wrlocked(&osdc->lock));
84
}
85
static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq)
86
{
87
WARN_ON(!mutex_is_locked(&lreq->lock));
88
}
89
#else
90
static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
91
static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
92
static inline void verify_osd_locked(struct ceph_osd *osd) { }
93
static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { }
94
#endif
95
96
/*
97
* calculate the mapping of a file extent onto an object, and fill out the
98
* request accordingly. shorten extent as necessary if it crosses an
99
* object boundary.
100
*
101
* fill osd op in request message.
102
*/
103
static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
104
u64 *objnum, u64 *objoff, u64 *objlen)
105
{
106
u64 orig_len = *plen;
107
u32 xlen;
108
109
/* object extent? */
110
ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
111
objoff, &xlen);
112
*objlen = xlen;
113
if (*objlen < orig_len) {
114
*plen = *objlen;
115
dout(" skipping last %llu, final file extent %llu~%llu\n",
116
orig_len - *plen, off, *plen);
117
}
118
119
dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
120
return 0;
121
}
122
123
static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
124
{
125
memset(osd_data, 0, sizeof (*osd_data));
126
osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
127
}
128
129
/*
130
* Consumes @pages if @own_pages is true.
131
*/
132
static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
133
struct page **pages, u64 length, u32 alignment,
134
bool pages_from_pool, bool own_pages)
135
{
136
osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
137
osd_data->pages = pages;
138
osd_data->length = length;
139
osd_data->alignment = alignment;
140
osd_data->pages_from_pool = pages_from_pool;
141
osd_data->own_pages = own_pages;
142
}
143
144
/*
145
* Consumes a ref on @pagelist.
146
*/
147
static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
148
struct ceph_pagelist *pagelist)
149
{
150
osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
151
osd_data->pagelist = pagelist;
152
}
153
154
#ifdef CONFIG_BLOCK
155
static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
156
struct ceph_bio_iter *bio_pos,
157
u32 bio_length)
158
{
159
osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
160
osd_data->bio_pos = *bio_pos;
161
osd_data->bio_length = bio_length;
162
}
163
#endif /* CONFIG_BLOCK */
164
165
static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
166
struct ceph_bvec_iter *bvec_pos,
167
u32 num_bvecs)
168
{
169
osd_data->type = CEPH_OSD_DATA_TYPE_BVECS;
170
osd_data->bvec_pos = *bvec_pos;
171
osd_data->num_bvecs = num_bvecs;
172
}
173
174
static void ceph_osd_iter_init(struct ceph_osd_data *osd_data,
175
struct iov_iter *iter)
176
{
177
osd_data->type = CEPH_OSD_DATA_TYPE_ITER;
178
osd_data->iter = *iter;
179
}
180
181
static struct ceph_osd_data *
182
osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
183
{
184
BUG_ON(which >= osd_req->r_num_ops);
185
186
return &osd_req->r_ops[which].raw_data_in;
187
}
188
189
struct ceph_osd_data *
190
osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
191
unsigned int which)
192
{
193
return osd_req_op_data(osd_req, which, extent, osd_data);
194
}
195
EXPORT_SYMBOL(osd_req_op_extent_osd_data);
196
197
void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
198
unsigned int which, struct page **pages,
199
u64 length, u32 alignment,
200
bool pages_from_pool, bool own_pages)
201
{
202
struct ceph_osd_data *osd_data;
203
204
osd_data = osd_req_op_raw_data_in(osd_req, which);
205
ceph_osd_data_pages_init(osd_data, pages, length, alignment,
206
pages_from_pool, own_pages);
207
}
208
EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
209
210
void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
211
unsigned int which, struct page **pages,
212
u64 length, u32 alignment,
213
bool pages_from_pool, bool own_pages)
214
{
215
struct ceph_osd_data *osd_data;
216
217
osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
218
ceph_osd_data_pages_init(osd_data, pages, length, alignment,
219
pages_from_pool, own_pages);
220
}
221
EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
222
223
#ifdef CONFIG_BLOCK
224
void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
225
unsigned int which,
226
struct ceph_bio_iter *bio_pos,
227
u32 bio_length)
228
{
229
struct ceph_osd_data *osd_data;
230
231
osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
232
ceph_osd_data_bio_init(osd_data, bio_pos, bio_length);
233
}
234
EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
235
#endif /* CONFIG_BLOCK */
236
237
void osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request *osd_req,
238
unsigned int which,
239
struct bio_vec *bvecs, u32 num_bvecs,
240
u32 bytes)
241
{
242
struct ceph_osd_data *osd_data;
243
struct ceph_bvec_iter it = {
244
.bvecs = bvecs,
245
.iter = { .bi_size = bytes },
246
};
247
248
osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
249
ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
250
}
251
EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvecs);
252
253
void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
254
unsigned int which,
255
struct ceph_bvec_iter *bvec_pos)
256
{
257
struct ceph_osd_data *osd_data;
258
259
osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
260
ceph_osd_data_bvecs_init(osd_data, bvec_pos, 0);
261
}
262
EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
263
264
/**
265
* osd_req_op_extent_osd_iter - Set up an operation with an iterator buffer
266
* @osd_req: The request to set up
267
* @which: Index of the operation in which to set the iter
268
* @iter: The buffer iterator
269
*/
270
void osd_req_op_extent_osd_iter(struct ceph_osd_request *osd_req,
271
unsigned int which, struct iov_iter *iter)
272
{
273
struct ceph_osd_data *osd_data;
274
275
osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
276
ceph_osd_iter_init(osd_data, iter);
277
}
278
EXPORT_SYMBOL(osd_req_op_extent_osd_iter);
279
280
static void osd_req_op_cls_request_info_pagelist(
281
struct ceph_osd_request *osd_req,
282
unsigned int which, struct ceph_pagelist *pagelist)
283
{
284
struct ceph_osd_data *osd_data;
285
286
osd_data = osd_req_op_data(osd_req, which, cls, request_info);
287
ceph_osd_data_pagelist_init(osd_data, pagelist);
288
}
289
290
void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
291
unsigned int which, struct page **pages, u64 length,
292
u32 alignment, bool pages_from_pool, bool own_pages)
293
{
294
struct ceph_osd_data *osd_data;
295
296
osd_data = osd_req_op_data(osd_req, which, cls, request_data);
297
ceph_osd_data_pages_init(osd_data, pages, length, alignment,
298
pages_from_pool, own_pages);
299
osd_req->r_ops[which].cls.indata_len += length;
300
osd_req->r_ops[which].indata_len += length;
301
}
302
EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
303
304
void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
305
unsigned int which,
306
struct bio_vec *bvecs, u32 num_bvecs,
307
u32 bytes)
308
{
309
struct ceph_osd_data *osd_data;
310
struct ceph_bvec_iter it = {
311
.bvecs = bvecs,
312
.iter = { .bi_size = bytes },
313
};
314
315
osd_data = osd_req_op_data(osd_req, which, cls, request_data);
316
ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
317
osd_req->r_ops[which].cls.indata_len += bytes;
318
osd_req->r_ops[which].indata_len += bytes;
319
}
320
EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);
321
322
void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
323
unsigned int which, struct page **pages, u64 length,
324
u32 alignment, bool pages_from_pool, bool own_pages)
325
{
326
struct ceph_osd_data *osd_data;
327
328
osd_data = osd_req_op_data(osd_req, which, cls, response_data);
329
ceph_osd_data_pages_init(osd_data, pages, length, alignment,
330
pages_from_pool, own_pages);
331
}
332
EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
333
334
static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
335
{
336
switch (osd_data->type) {
337
case CEPH_OSD_DATA_TYPE_NONE:
338
return 0;
339
case CEPH_OSD_DATA_TYPE_PAGES:
340
return osd_data->length;
341
case CEPH_OSD_DATA_TYPE_PAGELIST:
342
return (u64)osd_data->pagelist->length;
343
#ifdef CONFIG_BLOCK
344
case CEPH_OSD_DATA_TYPE_BIO:
345
return (u64)osd_data->bio_length;
346
#endif /* CONFIG_BLOCK */
347
case CEPH_OSD_DATA_TYPE_BVECS:
348
return osd_data->bvec_pos.iter.bi_size;
349
case CEPH_OSD_DATA_TYPE_ITER:
350
return iov_iter_count(&osd_data->iter);
351
default:
352
WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
353
return 0;
354
}
355
}
356
357
static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
358
{
359
if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
360
int num_pages;
361
362
num_pages = calc_pages_for((u64)osd_data->alignment,
363
(u64)osd_data->length);
364
ceph_release_page_vector(osd_data->pages, num_pages);
365
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
366
ceph_pagelist_release(osd_data->pagelist);
367
}
368
ceph_osd_data_init(osd_data);
369
}
370
371
static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
372
unsigned int which)
373
{
374
struct ceph_osd_req_op *op;
375
376
BUG_ON(which >= osd_req->r_num_ops);
377
op = &osd_req->r_ops[which];
378
379
switch (op->op) {
380
case CEPH_OSD_OP_READ:
381
case CEPH_OSD_OP_SPARSE_READ:
382
case CEPH_OSD_OP_WRITE:
383
case CEPH_OSD_OP_WRITEFULL:
384
kfree(op->extent.sparse_ext);
385
ceph_osd_data_release(&op->extent.osd_data);
386
break;
387
case CEPH_OSD_OP_CALL:
388
ceph_osd_data_release(&op->cls.request_info);
389
ceph_osd_data_release(&op->cls.request_data);
390
ceph_osd_data_release(&op->cls.response_data);
391
break;
392
case CEPH_OSD_OP_SETXATTR:
393
case CEPH_OSD_OP_CMPXATTR:
394
ceph_osd_data_release(&op->xattr.osd_data);
395
break;
396
case CEPH_OSD_OP_STAT:
397
ceph_osd_data_release(&op->raw_data_in);
398
break;
399
case CEPH_OSD_OP_NOTIFY_ACK:
400
ceph_osd_data_release(&op->notify_ack.request_data);
401
break;
402
case CEPH_OSD_OP_NOTIFY:
403
ceph_osd_data_release(&op->notify.request_data);
404
ceph_osd_data_release(&op->notify.response_data);
405
break;
406
case CEPH_OSD_OP_LIST_WATCHERS:
407
ceph_osd_data_release(&op->list_watchers.response_data);
408
break;
409
case CEPH_OSD_OP_COPY_FROM2:
410
ceph_osd_data_release(&op->copy_from.osd_data);
411
break;
412
default:
413
break;
414
}
415
}
416
417
/*
418
* Assumes @t is zero-initialized.
419
*/
420
static void target_init(struct ceph_osd_request_target *t)
421
{
422
ceph_oid_init(&t->base_oid);
423
ceph_oloc_init(&t->base_oloc);
424
ceph_oid_init(&t->target_oid);
425
ceph_oloc_init(&t->target_oloc);
426
427
ceph_osds_init(&t->acting);
428
ceph_osds_init(&t->up);
429
t->size = -1;
430
t->min_size = -1;
431
432
t->osd = CEPH_HOMELESS_OSD;
433
}
434
435
static void target_copy(struct ceph_osd_request_target *dest,
436
const struct ceph_osd_request_target *src)
437
{
438
ceph_oid_copy(&dest->base_oid, &src->base_oid);
439
ceph_oloc_copy(&dest->base_oloc, &src->base_oloc);
440
ceph_oid_copy(&dest->target_oid, &src->target_oid);
441
ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
442
443
dest->pgid = src->pgid; /* struct */
444
dest->spgid = src->spgid; /* struct */
445
dest->pg_num = src->pg_num;
446
dest->pg_num_mask = src->pg_num_mask;
447
ceph_osds_copy(&dest->acting, &src->acting);
448
ceph_osds_copy(&dest->up, &src->up);
449
dest->size = src->size;
450
dest->min_size = src->min_size;
451
dest->sort_bitwise = src->sort_bitwise;
452
dest->recovery_deletes = src->recovery_deletes;
453
454
dest->flags = src->flags;
455
dest->used_replica = src->used_replica;
456
dest->paused = src->paused;
457
458
dest->epoch = src->epoch;
459
dest->last_force_resend = src->last_force_resend;
460
461
dest->osd = src->osd;
462
}
463
464
static void target_destroy(struct ceph_osd_request_target *t)
465
{
466
ceph_oid_destroy(&t->base_oid);
467
ceph_oloc_destroy(&t->base_oloc);
468
ceph_oid_destroy(&t->target_oid);
469
ceph_oloc_destroy(&t->target_oloc);
470
}
471
472
/*
473
* requests
474
*/
475
static void request_release_checks(struct ceph_osd_request *req)
476
{
477
WARN_ON(!RB_EMPTY_NODE(&req->r_node));
478
WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
479
WARN_ON(!list_empty(&req->r_private_item));
480
WARN_ON(req->r_osd);
481
}
482
483
static void ceph_osdc_release_request(struct kref *kref)
484
{
485
struct ceph_osd_request *req = container_of(kref,
486
struct ceph_osd_request, r_kref);
487
unsigned int which;
488
489
dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
490
req->r_request, req->r_reply);
491
request_release_checks(req);
492
493
if (req->r_request)
494
ceph_msg_put(req->r_request);
495
if (req->r_reply)
496
ceph_msg_put(req->r_reply);
497
498
for (which = 0; which < req->r_num_ops; which++)
499
osd_req_op_data_release(req, which);
500
501
target_destroy(&req->r_t);
502
ceph_put_snap_context(req->r_snapc);
503
504
if (req->r_mempool)
505
mempool_free(req, req->r_osdc->req_mempool);
506
else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
507
kmem_cache_free(ceph_osd_request_cache, req);
508
else
509
kfree(req);
510
}
511
512
void ceph_osdc_get_request(struct ceph_osd_request *req)
513
{
514
dout("%s %p (was %d)\n", __func__, req,
515
kref_read(&req->r_kref));
516
kref_get(&req->r_kref);
517
}
518
EXPORT_SYMBOL(ceph_osdc_get_request);
519
520
void ceph_osdc_put_request(struct ceph_osd_request *req)
521
{
522
if (req) {
523
dout("%s %p (was %d)\n", __func__, req,
524
kref_read(&req->r_kref));
525
kref_put(&req->r_kref, ceph_osdc_release_request);
526
}
527
}
528
EXPORT_SYMBOL(ceph_osdc_put_request);
529
530
static void request_init(struct ceph_osd_request *req)
531
{
532
/* req only, each op is zeroed in osd_req_op_init() */
533
memset(req, 0, sizeof(*req));
534
535
kref_init(&req->r_kref);
536
init_completion(&req->r_completion);
537
RB_CLEAR_NODE(&req->r_node);
538
RB_CLEAR_NODE(&req->r_mc_node);
539
INIT_LIST_HEAD(&req->r_private_item);
540
541
target_init(&req->r_t);
542
}
543
544
struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
545
struct ceph_snap_context *snapc,
546
unsigned int num_ops,
547
bool use_mempool,
548
gfp_t gfp_flags)
549
{
550
struct ceph_osd_request *req;
551
552
if (use_mempool) {
553
BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
554
req = mempool_alloc(osdc->req_mempool, gfp_flags);
555
} else if (num_ops <= CEPH_OSD_SLAB_OPS) {
556
req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
557
} else {
558
BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
559
req = kmalloc(struct_size(req, r_ops, num_ops), gfp_flags);
560
}
561
if (unlikely(!req))
562
return NULL;
563
564
request_init(req);
565
req->r_osdc = osdc;
566
req->r_mempool = use_mempool;
567
req->r_num_ops = num_ops;
568
req->r_snapid = CEPH_NOSNAP;
569
req->r_snapc = ceph_get_snap_context(snapc);
570
571
dout("%s req %p\n", __func__, req);
572
return req;
573
}
574
EXPORT_SYMBOL(ceph_osdc_alloc_request);
575
576
static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
577
{
578
return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
579
}
580
581
static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp,
582
int num_request_data_items,
583
int num_reply_data_items)
584
{
585
struct ceph_osd_client *osdc = req->r_osdc;
586
struct ceph_msg *msg;
587
int msg_size;
588
589
WARN_ON(req->r_request || req->r_reply);
590
WARN_ON(ceph_oid_empty(&req->r_base_oid));
591
WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
592
593
/* create request message */
594
msg_size = CEPH_ENCODING_START_BLK_LEN +
595
CEPH_PGID_ENCODING_LEN + 1; /* spgid */
596
msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */
597
msg_size += CEPH_ENCODING_START_BLK_LEN +
598
sizeof(struct ceph_osd_reqid); /* reqid */
599
msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */
600
msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */
601
msg_size += CEPH_ENCODING_START_BLK_LEN +
602
ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
603
msg_size += 4 + req->r_base_oid.name_len; /* oid */
604
msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
605
msg_size += 8; /* snapid */
606
msg_size += 8; /* snap_seq */
607
msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
608
msg_size += 4 + 8; /* retry_attempt, features */
609
610
if (req->r_mempool)
611
msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size,
612
num_request_data_items);
613
else
614
msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size,
615
num_request_data_items, gfp, true);
616
if (!msg)
617
return -ENOMEM;
618
619
memset(msg->front.iov_base, 0, msg->front.iov_len);
620
req->r_request = msg;
621
622
/* create reply message */
623
msg_size = OSD_OPREPLY_FRONT_LEN;
624
msg_size += req->r_base_oid.name_len;
625
msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
626
627
if (req->r_mempool)
628
msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size,
629
num_reply_data_items);
630
else
631
msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size,
632
num_reply_data_items, gfp, true);
633
if (!msg)
634
return -ENOMEM;
635
636
req->r_reply = msg;
637
638
return 0;
639
}
640
641
static bool osd_req_opcode_valid(u16 opcode)
642
{
643
switch (opcode) {
644
#define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true;
645
__CEPH_FORALL_OSD_OPS(GENERATE_CASE)
646
#undef GENERATE_CASE
647
default:
648
return false;
649
}
650
}
651
652
static void get_num_data_items(struct ceph_osd_request *req,
653
int *num_request_data_items,
654
int *num_reply_data_items)
655
{
656
struct ceph_osd_req_op *op;
657
658
*num_request_data_items = 0;
659
*num_reply_data_items = 0;
660
661
for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
662
switch (op->op) {
663
/* request */
664
case CEPH_OSD_OP_WRITE:
665
case CEPH_OSD_OP_WRITEFULL:
666
case CEPH_OSD_OP_SETXATTR:
667
case CEPH_OSD_OP_CMPXATTR:
668
case CEPH_OSD_OP_NOTIFY_ACK:
669
case CEPH_OSD_OP_COPY_FROM2:
670
*num_request_data_items += 1;
671
break;
672
673
/* reply */
674
case CEPH_OSD_OP_STAT:
675
case CEPH_OSD_OP_READ:
676
case CEPH_OSD_OP_SPARSE_READ:
677
case CEPH_OSD_OP_LIST_WATCHERS:
678
*num_reply_data_items += 1;
679
break;
680
681
/* both */
682
case CEPH_OSD_OP_NOTIFY:
683
*num_request_data_items += 1;
684
*num_reply_data_items += 1;
685
break;
686
case CEPH_OSD_OP_CALL:
687
*num_request_data_items += 2;
688
*num_reply_data_items += 1;
689
break;
690
691
default:
692
WARN_ON(!osd_req_opcode_valid(op->op));
693
break;
694
}
695
}
696
}
697
698
/*
699
* oid, oloc and OSD op opcode(s) must be filled in before this function
700
* is called.
701
*/
702
int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
703
{
704
int num_request_data_items, num_reply_data_items;
705
706
get_num_data_items(req, &num_request_data_items, &num_reply_data_items);
707
return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items,
708
num_reply_data_items);
709
}
710
EXPORT_SYMBOL(ceph_osdc_alloc_messages);
711
712
/*
713
* This is an osd op init function for opcodes that have no data or
714
* other information associated with them. It also serves as a
715
* common init routine for all the other init functions, below.
716
*/
717
struct ceph_osd_req_op *
718
osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
719
u16 opcode, u32 flags)
720
{
721
struct ceph_osd_req_op *op;
722
723
BUG_ON(which >= osd_req->r_num_ops);
724
BUG_ON(!osd_req_opcode_valid(opcode));
725
726
op = &osd_req->r_ops[which];
727
memset(op, 0, sizeof (*op));
728
op->op = opcode;
729
op->flags = flags;
730
731
return op;
732
}
733
EXPORT_SYMBOL(osd_req_op_init);
734
735
void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
736
unsigned int which, u16 opcode,
737
u64 offset, u64 length,
738
u64 truncate_size, u32 truncate_seq)
739
{
740
struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
741
opcode, 0);
742
size_t payload_len = 0;
743
744
BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
745
opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
746
opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ);
747
748
op->extent.offset = offset;
749
op->extent.length = length;
750
op->extent.truncate_size = truncate_size;
751
op->extent.truncate_seq = truncate_seq;
752
if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
753
payload_len += length;
754
755
op->indata_len = payload_len;
756
}
757
EXPORT_SYMBOL(osd_req_op_extent_init);
758
759
void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
760
unsigned int which, u64 length)
761
{
762
struct ceph_osd_req_op *op;
763
u64 previous;
764
765
BUG_ON(which >= osd_req->r_num_ops);
766
op = &osd_req->r_ops[which];
767
previous = op->extent.length;
768
769
if (length == previous)
770
return; /* Nothing to do */
771
BUG_ON(length > previous);
772
773
op->extent.length = length;
774
if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
775
op->indata_len -= previous - length;
776
}
777
EXPORT_SYMBOL(osd_req_op_extent_update);
778
779
void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
780
unsigned int which, u64 offset_inc)
781
{
782
struct ceph_osd_req_op *op, *prev_op;
783
784
BUG_ON(which + 1 >= osd_req->r_num_ops);
785
786
prev_op = &osd_req->r_ops[which];
787
op = osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
788
/* dup previous one */
789
op->indata_len = prev_op->indata_len;
790
op->outdata_len = prev_op->outdata_len;
791
op->extent = prev_op->extent;
792
/* adjust offset */
793
op->extent.offset += offset_inc;
794
op->extent.length -= offset_inc;
795
796
if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
797
op->indata_len -= offset_inc;
798
}
799
EXPORT_SYMBOL(osd_req_op_extent_dup_last);
800
801
int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
802
const char *class, const char *method)
803
{
804
struct ceph_osd_req_op *op;
805
struct ceph_pagelist *pagelist;
806
size_t payload_len = 0;
807
size_t size;
808
int ret;
809
810
op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
811
812
pagelist = ceph_pagelist_alloc(GFP_NOFS);
813
if (!pagelist)
814
return -ENOMEM;
815
816
op->cls.class_name = class;
817
size = strlen(class);
818
BUG_ON(size > (size_t) U8_MAX);
819
op->cls.class_len = size;
820
ret = ceph_pagelist_append(pagelist, class, size);
821
if (ret)
822
goto err_pagelist_free;
823
payload_len += size;
824
825
op->cls.method_name = method;
826
size = strlen(method);
827
BUG_ON(size > (size_t) U8_MAX);
828
op->cls.method_len = size;
829
ret = ceph_pagelist_append(pagelist, method, size);
830
if (ret)
831
goto err_pagelist_free;
832
payload_len += size;
833
834
osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
835
op->indata_len = payload_len;
836
return 0;
837
838
err_pagelist_free:
839
ceph_pagelist_release(pagelist);
840
return ret;
841
}
842
EXPORT_SYMBOL(osd_req_op_cls_init);
843
844
int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
845
u16 opcode, const char *name, const void *value,
846
size_t size, u8 cmp_op, u8 cmp_mode)
847
{
848
struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
849
opcode, 0);
850
struct ceph_pagelist *pagelist;
851
size_t payload_len;
852
int ret;
853
854
BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
855
856
pagelist = ceph_pagelist_alloc(GFP_NOFS);
857
if (!pagelist)
858
return -ENOMEM;
859
860
payload_len = strlen(name);
861
op->xattr.name_len = payload_len;
862
ret = ceph_pagelist_append(pagelist, name, payload_len);
863
if (ret)
864
goto err_pagelist_free;
865
866
op->xattr.value_len = size;
867
ret = ceph_pagelist_append(pagelist, value, size);
868
if (ret)
869
goto err_pagelist_free;
870
payload_len += size;
871
872
op->xattr.cmp_op = cmp_op;
873
op->xattr.cmp_mode = cmp_mode;
874
875
ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
876
op->indata_len = payload_len;
877
return 0;
878
879
err_pagelist_free:
880
ceph_pagelist_release(pagelist);
881
return ret;
882
}
883
EXPORT_SYMBOL(osd_req_op_xattr_init);
884
885
/*
886
* @watch_opcode: CEPH_OSD_WATCH_OP_*
887
*/
888
static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
889
u8 watch_opcode, u64 cookie, u32 gen)
890
{
891
struct ceph_osd_req_op *op;
892
893
op = osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
894
op->watch.cookie = cookie;
895
op->watch.op = watch_opcode;
896
op->watch.gen = gen;
897
}
898
899
/*
900
* prot_ver, timeout and notify payload (may be empty) should already be
901
* encoded in @request_pl
902
*/
903
static void osd_req_op_notify_init(struct ceph_osd_request *req, int which,
904
u64 cookie, struct ceph_pagelist *request_pl)
905
{
906
struct ceph_osd_req_op *op;
907
908
op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
909
op->notify.cookie = cookie;
910
911
ceph_osd_data_pagelist_init(&op->notify.request_data, request_pl);
912
op->indata_len = request_pl->length;
913
}
914
915
/*
916
* @flags: CEPH_OSD_OP_ALLOC_HINT_FLAG_*
917
*/
918
void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
919
unsigned int which,
920
u64 expected_object_size,
921
u64 expected_write_size,
922
u32 flags)
923
{
924
struct ceph_osd_req_op *op;
925
926
op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_SETALLOCHINT, 0);
927
op->alloc_hint.expected_object_size = expected_object_size;
928
op->alloc_hint.expected_write_size = expected_write_size;
929
op->alloc_hint.flags = flags;
930
931
/*
932
* CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
933
* not worth a feature bit. Set FAILOK per-op flag to make
934
* sure older osds don't trip over an unsupported opcode.
935
*/
936
op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
937
}
938
EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
939
940
static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
941
struct ceph_osd_data *osd_data)
942
{
943
u64 length = ceph_osd_data_length(osd_data);
944
945
if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
946
BUG_ON(length > (u64) SIZE_MAX);
947
if (length)
948
ceph_msg_data_add_pages(msg, osd_data->pages,
949
length, osd_data->alignment, false);
950
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
951
BUG_ON(!length);
952
ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
953
#ifdef CONFIG_BLOCK
954
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
955
ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
956
#endif
957
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
958
ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
959
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_ITER) {
960
ceph_msg_data_add_iter(msg, &osd_data->iter);
961
} else {
962
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
963
}
964
}
965
966
static u32 osd_req_encode_op(struct ceph_osd_op *dst,
967
const struct ceph_osd_req_op *src)
968
{
969
switch (src->op) {
970
case CEPH_OSD_OP_STAT:
971
break;
972
case CEPH_OSD_OP_READ:
973
case CEPH_OSD_OP_SPARSE_READ:
974
case CEPH_OSD_OP_WRITE:
975
case CEPH_OSD_OP_WRITEFULL:
976
case CEPH_OSD_OP_ZERO:
977
case CEPH_OSD_OP_TRUNCATE:
978
dst->extent.offset = cpu_to_le64(src->extent.offset);
979
dst->extent.length = cpu_to_le64(src->extent.length);
980
dst->extent.truncate_size =
981
cpu_to_le64(src->extent.truncate_size);
982
dst->extent.truncate_seq =
983
cpu_to_le32(src->extent.truncate_seq);
984
break;
985
case CEPH_OSD_OP_CALL:
986
dst->cls.class_len = src->cls.class_len;
987
dst->cls.method_len = src->cls.method_len;
988
dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
989
break;
990
case CEPH_OSD_OP_WATCH:
991
dst->watch.cookie = cpu_to_le64(src->watch.cookie);
992
dst->watch.ver = cpu_to_le64(0);
993
dst->watch.op = src->watch.op;
994
dst->watch.gen = cpu_to_le32(src->watch.gen);
995
break;
996
case CEPH_OSD_OP_NOTIFY_ACK:
997
break;
998
case CEPH_OSD_OP_NOTIFY:
999
dst->notify.cookie = cpu_to_le64(src->notify.cookie);
1000
break;
1001
case CEPH_OSD_OP_LIST_WATCHERS:
1002
break;
1003
case CEPH_OSD_OP_SETALLOCHINT:
1004
dst->alloc_hint.expected_object_size =
1005
cpu_to_le64(src->alloc_hint.expected_object_size);
1006
dst->alloc_hint.expected_write_size =
1007
cpu_to_le64(src->alloc_hint.expected_write_size);
1008
dst->alloc_hint.flags = cpu_to_le32(src->alloc_hint.flags);
1009
break;
1010
case CEPH_OSD_OP_SETXATTR:
1011
case CEPH_OSD_OP_CMPXATTR:
1012
dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
1013
dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
1014
dst->xattr.cmp_op = src->xattr.cmp_op;
1015
dst->xattr.cmp_mode = src->xattr.cmp_mode;
1016
break;
1017
case CEPH_OSD_OP_CREATE:
1018
case CEPH_OSD_OP_DELETE:
1019
break;
1020
case CEPH_OSD_OP_COPY_FROM2:
1021
dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
1022
dst->copy_from.src_version =
1023
cpu_to_le64(src->copy_from.src_version);
1024
dst->copy_from.flags = src->copy_from.flags;
1025
dst->copy_from.src_fadvise_flags =
1026
cpu_to_le32(src->copy_from.src_fadvise_flags);
1027
break;
1028
case CEPH_OSD_OP_ASSERT_VER:
1029
dst->assert_ver.unused = cpu_to_le64(0);
1030
dst->assert_ver.ver = cpu_to_le64(src->assert_ver.ver);
1031
break;
1032
default:
1033
pr_err("unsupported osd opcode %s\n",
1034
ceph_osd_op_name(src->op));
1035
WARN_ON(1);
1036
1037
return 0;
1038
}
1039
1040
dst->op = cpu_to_le16(src->op);
1041
dst->flags = cpu_to_le32(src->flags);
1042
dst->payload_len = cpu_to_le32(src->indata_len);
1043
1044
return src->indata_len;
1045
}
1046
1047
/*
1048
* build new request AND message, calculate layout, and adjust file
1049
* extent as needed.
1050
*
1051
* if the file was recently truncated, we include information about its
1052
* old and new size so that the object can be updated appropriately. (we
1053
* avoid synchronously deleting truncated objects because it's slow.)
1054
*/
1055
struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
1056
struct ceph_file_layout *layout,
1057
struct ceph_vino vino,
1058
u64 off, u64 *plen,
1059
unsigned int which, int num_ops,
1060
int opcode, int flags,
1061
struct ceph_snap_context *snapc,
1062
u32 truncate_seq,
1063
u64 truncate_size,
1064
bool use_mempool)
1065
{
1066
struct ceph_osd_request *req;
1067
u64 objnum = 0;
1068
u64 objoff = 0;
1069
u64 objlen = 0;
1070
int r;
1071
1072
BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
1073
opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
1074
opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE &&
1075
opcode != CEPH_OSD_OP_SPARSE_READ);
1076
1077
req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
1078
GFP_NOFS);
1079
if (!req) {
1080
r = -ENOMEM;
1081
goto fail;
1082
}
1083
1084
/* calculate max write size */
1085
r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
1086
if (r)
1087
goto fail;
1088
1089
if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
1090
osd_req_op_init(req, which, opcode, 0);
1091
} else {
1092
u32 object_size = layout->object_size;
1093
u32 object_base = off - objoff;
1094
if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
1095
if (truncate_size <= object_base) {
1096
truncate_size = 0;
1097
} else {
1098
truncate_size -= object_base;
1099
if (truncate_size > object_size)
1100
truncate_size = object_size;
1101
}
1102
}
1103
osd_req_op_extent_init(req, which, opcode, objoff, objlen,
1104
truncate_size, truncate_seq);
1105
}
1106
1107
req->r_base_oloc.pool = layout->pool_id;
1108
req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
1109
ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
1110
req->r_flags = flags | osdc->client->options->read_from_replica;
1111
1112
req->r_snapid = vino.snap;
1113
if (flags & CEPH_OSD_FLAG_WRITE)
1114
req->r_data_offset = off;
1115
1116
if (num_ops > 1) {
1117
int num_req_ops, num_rep_ops;
1118
1119
/*
1120
* If this is a multi-op write request, assume that we'll need
1121
* request ops. If it's a multi-op read then assume we'll need
1122
* reply ops. Anything else and call it -EINVAL.
1123
*/
1124
if (flags & CEPH_OSD_FLAG_WRITE) {
1125
num_req_ops = num_ops;
1126
num_rep_ops = 0;
1127
} else if (flags & CEPH_OSD_FLAG_READ) {
1128
num_req_ops = 0;
1129
num_rep_ops = num_ops;
1130
} else {
1131
r = -EINVAL;
1132
goto fail;
1133
}
1134
1135
r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_req_ops,
1136
num_rep_ops);
1137
} else {
1138
r = ceph_osdc_alloc_messages(req, GFP_NOFS);
1139
}
1140
if (r)
1141
goto fail;
1142
1143
return req;
1144
1145
fail:
1146
ceph_osdc_put_request(req);
1147
return ERR_PTR(r);
1148
}
1149
EXPORT_SYMBOL(ceph_osdc_new_request);
1150
1151
int __ceph_alloc_sparse_ext_map(struct ceph_osd_req_op *op, int cnt)
1152
{
1153
WARN_ON(op->op != CEPH_OSD_OP_SPARSE_READ);
1154
1155
op->extent.sparse_ext_cnt = cnt;
1156
op->extent.sparse_ext = kmalloc_array(cnt,
1157
sizeof(*op->extent.sparse_ext),
1158
GFP_NOFS);
1159
if (!op->extent.sparse_ext)
1160
return -ENOMEM;
1161
return 0;
1162
}
1163
EXPORT_SYMBOL(__ceph_alloc_sparse_ext_map);
1164
1165
/*
1166
* We keep osd requests in an rbtree, sorted by ->r_tid.
1167
*/
1168
DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
1169
DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
1170
1171
/*
1172
* Call @fn on each OSD request as long as @fn returns 0.
1173
*/
1174
static void for_each_request(struct ceph_osd_client *osdc,
1175
int (*fn)(struct ceph_osd_request *req, void *arg),
1176
void *arg)
1177
{
1178
struct rb_node *n, *p;
1179
1180
for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
1181
struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
1182
1183
for (p = rb_first(&osd->o_requests); p; ) {
1184
struct ceph_osd_request *req =
1185
rb_entry(p, struct ceph_osd_request, r_node);
1186
1187
p = rb_next(p);
1188
if (fn(req, arg))
1189
return;
1190
}
1191
}
1192
1193
for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
1194
struct ceph_osd_request *req =
1195
rb_entry(p, struct ceph_osd_request, r_node);
1196
1197
p = rb_next(p);
1198
if (fn(req, arg))
1199
return;
1200
}
1201
}
1202
1203
static bool osd_homeless(struct ceph_osd *osd)
1204
{
1205
return osd->o_osd == CEPH_HOMELESS_OSD;
1206
}
1207
1208
static bool osd_registered(struct ceph_osd *osd)
1209
{
1210
verify_osdc_locked(osd->o_osdc);
1211
1212
return !RB_EMPTY_NODE(&osd->o_node);
1213
}
1214
1215
/*
1216
* Assumes @osd is zero-initialized.
1217
*/
1218
static void osd_init(struct ceph_osd *osd)
1219
{
1220
refcount_set(&osd->o_ref, 1);
1221
RB_CLEAR_NODE(&osd->o_node);
1222
spin_lock_init(&osd->o_requests_lock);
1223
osd->o_requests = RB_ROOT;
1224
osd->o_linger_requests = RB_ROOT;
1225
osd->o_backoff_mappings = RB_ROOT;
1226
osd->o_backoffs_by_id = RB_ROOT;
1227
INIT_LIST_HEAD(&osd->o_osd_lru);
1228
INIT_LIST_HEAD(&osd->o_keepalive_item);
1229
osd->o_incarnation = 1;
1230
mutex_init(&osd->lock);
1231
}
1232
1233
static void ceph_init_sparse_read(struct ceph_sparse_read *sr)
1234
{
1235
kfree(sr->sr_extent);
1236
memset(sr, '\0', sizeof(*sr));
1237
sr->sr_state = CEPH_SPARSE_READ_HDR;
1238
}
1239
1240
static void osd_cleanup(struct ceph_osd *osd)
1241
{
1242
WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
1243
WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
1244
WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
1245
WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings));
1246
WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id));
1247
WARN_ON(!list_empty(&osd->o_osd_lru));
1248
WARN_ON(!list_empty(&osd->o_keepalive_item));
1249
1250
ceph_init_sparse_read(&osd->o_sparse_read);
1251
1252
if (osd->o_auth.authorizer) {
1253
WARN_ON(osd_homeless(osd));
1254
ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
1255
}
1256
}
1257
1258
/*
1259
* Track open sessions with osds.
1260
*/
1261
static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
1262
{
1263
struct ceph_osd *osd;
1264
1265
WARN_ON(onum == CEPH_HOMELESS_OSD);
1266
1267
osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL);
1268
osd_init(osd);
1269
osd->o_osdc = osdc;
1270
osd->o_osd = onum;
1271
osd->o_sparse_op_idx = -1;
1272
1273
ceph_init_sparse_read(&osd->o_sparse_read);
1274
1275
ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
1276
1277
return osd;
1278
}
1279
1280
static struct ceph_osd *get_osd(struct ceph_osd *osd)
1281
{
1282
if (refcount_inc_not_zero(&osd->o_ref)) {
1283
dout("get_osd %p -> %d\n", osd, refcount_read(&osd->o_ref));
1284
return osd;
1285
} else {
1286
dout("get_osd %p FAIL\n", osd);
1287
return NULL;
1288
}
1289
}
1290
1291
static void put_osd(struct ceph_osd *osd)
1292
{
1293
dout("put_osd %p -> %d\n", osd, refcount_read(&osd->o_ref) - 1);
1294
if (refcount_dec_and_test(&osd->o_ref)) {
1295
osd_cleanup(osd);
1296
kfree(osd);
1297
}
1298
}
1299
1300
DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
1301
1302
static void __move_osd_to_lru(struct ceph_osd *osd)
1303
{
1304
struct ceph_osd_client *osdc = osd->o_osdc;
1305
1306
dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1307
BUG_ON(!list_empty(&osd->o_osd_lru));
1308
1309
spin_lock(&osdc->osd_lru_lock);
1310
list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
1311
spin_unlock(&osdc->osd_lru_lock);
1312
1313
osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
1314
}
1315
1316
static void maybe_move_osd_to_lru(struct ceph_osd *osd)
1317
{
1318
if (RB_EMPTY_ROOT(&osd->o_requests) &&
1319
RB_EMPTY_ROOT(&osd->o_linger_requests))
1320
__move_osd_to_lru(osd);
1321
}
1322
1323
static void __remove_osd_from_lru(struct ceph_osd *osd)
1324
{
1325
struct ceph_osd_client *osdc = osd->o_osdc;
1326
1327
dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1328
1329
spin_lock(&osdc->osd_lru_lock);
1330
if (!list_empty(&osd->o_osd_lru))
1331
list_del_init(&osd->o_osd_lru);
1332
spin_unlock(&osdc->osd_lru_lock);
1333
}
1334
1335
/*
1336
* Close the connection and assign any leftover requests to the
1337
* homeless session.
1338
*/
1339
static void close_osd(struct ceph_osd *osd)
1340
{
1341
struct ceph_osd_client *osdc = osd->o_osdc;
1342
struct rb_node *n;
1343
1344
verify_osdc_wrlocked(osdc);
1345
dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1346
1347
ceph_con_close(&osd->o_con);
1348
1349
for (n = rb_first(&osd->o_requests); n; ) {
1350
struct ceph_osd_request *req =
1351
rb_entry(n, struct ceph_osd_request, r_node);
1352
1353
n = rb_next(n); /* unlink_request() */
1354
1355
dout(" reassigning req %p tid %llu\n", req, req->r_tid);
1356
unlink_request(osd, req);
1357
link_request(&osdc->homeless_osd, req);
1358
}
1359
for (n = rb_first(&osd->o_linger_requests); n; ) {
1360
struct ceph_osd_linger_request *lreq =
1361
rb_entry(n, struct ceph_osd_linger_request, node);
1362
1363
n = rb_next(n); /* unlink_linger() */
1364
1365
dout(" reassigning lreq %p linger_id %llu\n", lreq,
1366
lreq->linger_id);
1367
unlink_linger(osd, lreq);
1368
link_linger(&osdc->homeless_osd, lreq);
1369
}
1370
clear_backoffs(osd);
1371
1372
__remove_osd_from_lru(osd);
1373
erase_osd(&osdc->osds, osd);
1374
put_osd(osd);
1375
}
1376
1377
/*
1378
* reset osd connect
1379
*/
1380
static int reopen_osd(struct ceph_osd *osd)
1381
{
1382
struct ceph_entity_addr *peer_addr;
1383
1384
dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1385
1386
if (RB_EMPTY_ROOT(&osd->o_requests) &&
1387
RB_EMPTY_ROOT(&osd->o_linger_requests)) {
1388
close_osd(osd);
1389
return -ENODEV;
1390
}
1391
1392
peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
1393
if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
1394
!ceph_con_opened(&osd->o_con)) {
1395
struct rb_node *n;
1396
1397
dout("osd addr hasn't changed and connection never opened, "
1398
"letting msgr retry\n");
1399
/* touch each r_stamp for handle_timeout()'s benfit */
1400
for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
1401
struct ceph_osd_request *req =
1402
rb_entry(n, struct ceph_osd_request, r_node);
1403
req->r_stamp = jiffies;
1404
}
1405
1406
return -EAGAIN;
1407
}
1408
1409
ceph_con_close(&osd->o_con);
1410
ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
1411
osd->o_incarnation++;
1412
1413
return 0;
1414
}
1415
1416
static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
1417
bool wrlocked)
1418
{
1419
struct ceph_osd *osd;
1420
1421
if (wrlocked)
1422
verify_osdc_wrlocked(osdc);
1423
else
1424
verify_osdc_locked(osdc);
1425
1426
if (o != CEPH_HOMELESS_OSD)
1427
osd = lookup_osd(&osdc->osds, o);
1428
else
1429
osd = &osdc->homeless_osd;
1430
if (!osd) {
1431
if (!wrlocked)
1432
return ERR_PTR(-EAGAIN);
1433
1434
osd = create_osd(osdc, o);
1435
insert_osd(&osdc->osds, osd);
1436
ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
1437
&osdc->osdmap->osd_addr[osd->o_osd]);
1438
}
1439
1440
dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
1441
return osd;
1442
}
1443
1444
/*
1445
* Create request <-> OSD session relation.
1446
*
1447
* @req has to be assigned a tid, @osd may be homeless.
1448
*/
1449
static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1450
{
1451
verify_osd_locked(osd);
1452
WARN_ON(!req->r_tid || req->r_osd);
1453
dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1454
req, req->r_tid);
1455
1456
if (!osd_homeless(osd))
1457
__remove_osd_from_lru(osd);
1458
else
1459
atomic_inc(&osd->o_osdc->num_homeless);
1460
1461
get_osd(osd);
1462
spin_lock(&osd->o_requests_lock);
1463
insert_request(&osd->o_requests, req);
1464
spin_unlock(&osd->o_requests_lock);
1465
req->r_osd = osd;
1466
}
1467
1468
static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1469
{
1470
verify_osd_locked(osd);
1471
WARN_ON(req->r_osd != osd);
1472
dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1473
req, req->r_tid);
1474
1475
req->r_osd = NULL;
1476
spin_lock(&osd->o_requests_lock);
1477
erase_request(&osd->o_requests, req);
1478
spin_unlock(&osd->o_requests_lock);
1479
put_osd(osd);
1480
1481
if (!osd_homeless(osd))
1482
maybe_move_osd_to_lru(osd);
1483
else
1484
atomic_dec(&osd->o_osdc->num_homeless);
1485
}
1486
1487
static bool __pool_full(struct ceph_pg_pool_info *pi)
1488
{
1489
return pi->flags & CEPH_POOL_FLAG_FULL;
1490
}
1491
1492
static bool have_pool_full(struct ceph_osd_client *osdc)
1493
{
1494
struct rb_node *n;
1495
1496
for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
1497
struct ceph_pg_pool_info *pi =
1498
rb_entry(n, struct ceph_pg_pool_info, node);
1499
1500
if (__pool_full(pi))
1501
return true;
1502
}
1503
1504
return false;
1505
}
1506
1507
static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
1508
{
1509
struct ceph_pg_pool_info *pi;
1510
1511
pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
1512
if (!pi)
1513
return false;
1514
1515
return __pool_full(pi);
1516
}
1517
1518
/*
1519
* Returns whether a request should be blocked from being sent
1520
* based on the current osdmap and osd_client settings.
1521
*/
1522
static bool target_should_be_paused(struct ceph_osd_client *osdc,
1523
const struct ceph_osd_request_target *t,
1524
struct ceph_pg_pool_info *pi)
1525
{
1526
bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
1527
bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
1528
ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
1529
__pool_full(pi);
1530
1531
WARN_ON(pi->id != t->target_oloc.pool);
1532
return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
1533
((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
1534
(osdc->osdmap->epoch < osdc->epoch_barrier);
1535
}
1536
1537
static int pick_random_replica(const struct ceph_osds *acting)
1538
{
1539
int i = get_random_u32_below(acting->size);
1540
1541
dout("%s picked osd%d, primary osd%d\n", __func__,
1542
acting->osds[i], acting->primary);
1543
return i;
1544
}
1545
1546
/*
1547
* Picks the closest replica based on client's location given by
1548
* crush_location option. Prefers the primary if the locality is
1549
* the same.
1550
*/
1551
static int pick_closest_replica(struct ceph_osd_client *osdc,
1552
const struct ceph_osds *acting)
1553
{
1554
struct ceph_options *opt = osdc->client->options;
1555
int best_i, best_locality;
1556
int i = 0, locality;
1557
1558
do {
1559
locality = ceph_get_crush_locality(osdc->osdmap,
1560
acting->osds[i],
1561
&opt->crush_locs);
1562
if (i == 0 ||
1563
(locality >= 0 && best_locality < 0) ||
1564
(locality >= 0 && best_locality >= 0 &&
1565
locality < best_locality)) {
1566
best_i = i;
1567
best_locality = locality;
1568
}
1569
} while (++i < acting->size);
1570
1571
dout("%s picked osd%d with locality %d, primary osd%d\n", __func__,
1572
acting->osds[best_i], best_locality, acting->primary);
1573
return best_i;
1574
}
1575
1576
enum calc_target_result {
1577
CALC_TARGET_NO_ACTION = 0,
1578
CALC_TARGET_NEED_RESEND,
1579
CALC_TARGET_POOL_DNE,
1580
};
1581
1582
static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1583
struct ceph_osd_request_target *t,
1584
bool any_change)
1585
{
1586
struct ceph_pg_pool_info *pi;
1587
struct ceph_pg pgid, last_pgid;
1588
struct ceph_osds up, acting;
1589
bool should_be_paused;
1590
bool is_read = t->flags & CEPH_OSD_FLAG_READ;
1591
bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
1592
bool force_resend = false;
1593
bool unpaused = false;
1594
bool legacy_change = false;
1595
bool split = false;
1596
bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
1597
bool recovery_deletes = ceph_osdmap_flag(osdc,
1598
CEPH_OSDMAP_RECOVERY_DELETES);
1599
enum calc_target_result ct_res;
1600
1601
t->epoch = osdc->osdmap->epoch;
1602
pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
1603
if (!pi) {
1604
t->osd = CEPH_HOMELESS_OSD;
1605
ct_res = CALC_TARGET_POOL_DNE;
1606
goto out;
1607
}
1608
1609
if (osdc->osdmap->epoch == pi->last_force_request_resend) {
1610
if (t->last_force_resend < pi->last_force_request_resend) {
1611
t->last_force_resend = pi->last_force_request_resend;
1612
force_resend = true;
1613
} else if (t->last_force_resend == 0) {
1614
force_resend = true;
1615
}
1616
}
1617
1618
/* apply tiering */
1619
ceph_oid_copy(&t->target_oid, &t->base_oid);
1620
ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
1621
if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1622
if (is_read && pi->read_tier >= 0)
1623
t->target_oloc.pool = pi->read_tier;
1624
if (is_write && pi->write_tier >= 0)
1625
t->target_oloc.pool = pi->write_tier;
1626
1627
pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
1628
if (!pi) {
1629
t->osd = CEPH_HOMELESS_OSD;
1630
ct_res = CALC_TARGET_POOL_DNE;
1631
goto out;
1632
}
1633
}
1634
1635
__ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, &pgid);
1636
last_pgid.pool = pgid.pool;
1637
last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
1638
1639
ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting);
1640
if (any_change &&
1641
ceph_is_new_interval(&t->acting,
1642
&acting,
1643
&t->up,
1644
&up,
1645
t->size,
1646
pi->size,
1647
t->min_size,
1648
pi->min_size,
1649
t->pg_num,
1650
pi->pg_num,
1651
t->sort_bitwise,
1652
sort_bitwise,
1653
t->recovery_deletes,
1654
recovery_deletes,
1655
&last_pgid))
1656
force_resend = true;
1657
1658
should_be_paused = target_should_be_paused(osdc, t, pi);
1659
if (t->paused && !should_be_paused) {
1660
unpaused = true;
1661
}
1662
if (t->paused != should_be_paused) {
1663
dout("%s t %p paused %d -> %d\n", __func__, t, t->paused,
1664
should_be_paused);
1665
t->paused = should_be_paused;
1666
}
1667
1668
legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
1669
ceph_osds_changed(&t->acting, &acting,
1670
t->used_replica || any_change);
1671
if (t->pg_num)
1672
split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
1673
1674
if (legacy_change || force_resend || split) {
1675
t->pgid = pgid; /* struct */
1676
ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid);
1677
ceph_osds_copy(&t->acting, &acting);
1678
ceph_osds_copy(&t->up, &up);
1679
t->size = pi->size;
1680
t->min_size = pi->min_size;
1681
t->pg_num = pi->pg_num;
1682
t->pg_num_mask = pi->pg_num_mask;
1683
t->sort_bitwise = sort_bitwise;
1684
t->recovery_deletes = recovery_deletes;
1685
1686
if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
1687
CEPH_OSD_FLAG_LOCALIZE_READS)) &&
1688
!is_write && pi->type == CEPH_POOL_TYPE_REP &&
1689
acting.size > 1) {
1690
int pos;
1691
1692
WARN_ON(!is_read || acting.osds[0] != acting.primary);
1693
if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
1694
pos = pick_random_replica(&acting);
1695
} else {
1696
pos = pick_closest_replica(osdc, &acting);
1697
}
1698
t->osd = acting.osds[pos];
1699
t->used_replica = pos > 0;
1700
} else {
1701
t->osd = acting.primary;
1702
t->used_replica = false;
1703
}
1704
}
1705
1706
if (unpaused || legacy_change || force_resend || split)
1707
ct_res = CALC_TARGET_NEED_RESEND;
1708
else
1709
ct_res = CALC_TARGET_NO_ACTION;
1710
1711
out:
1712
dout("%s t %p -> %d%d%d%d ct_res %d osd%d\n", __func__, t, unpaused,
1713
legacy_change, force_resend, split, ct_res, t->osd);
1714
return ct_res;
1715
}
1716
1717
static struct ceph_spg_mapping *alloc_spg_mapping(void)
1718
{
1719
struct ceph_spg_mapping *spg;
1720
1721
spg = kmalloc(sizeof(*spg), GFP_NOIO);
1722
if (!spg)
1723
return NULL;
1724
1725
RB_CLEAR_NODE(&spg->node);
1726
spg->backoffs = RB_ROOT;
1727
return spg;
1728
}
1729
1730
static void free_spg_mapping(struct ceph_spg_mapping *spg)
1731
{
1732
WARN_ON(!RB_EMPTY_NODE(&spg->node));
1733
WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs));
1734
1735
kfree(spg);
1736
}
1737
1738
/*
1739
* rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to
1740
* ceph_pg_mapping. Used to track OSD backoffs -- a backoff [range] is
1741
* defined only within a specific spgid; it does not pass anything to
1742
* children on split, or to another primary.
1743
*/
1744
DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare,
1745
RB_BYPTR, const struct ceph_spg *, node)
1746
1747
static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid)
1748
{
1749
return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits;
1750
}
1751
1752
static void hoid_get_effective_key(const struct ceph_hobject_id *hoid,
1753
void **pkey, size_t *pkey_len)
1754
{
1755
if (hoid->key_len) {
1756
*pkey = hoid->key;
1757
*pkey_len = hoid->key_len;
1758
} else {
1759
*pkey = hoid->oid;
1760
*pkey_len = hoid->oid_len;
1761
}
1762
}
1763
1764
static int compare_names(const void *name1, size_t name1_len,
1765
const void *name2, size_t name2_len)
1766
{
1767
int ret;
1768
1769
ret = memcmp(name1, name2, min(name1_len, name2_len));
1770
if (!ret) {
1771
if (name1_len < name2_len)
1772
ret = -1;
1773
else if (name1_len > name2_len)
1774
ret = 1;
1775
}
1776
return ret;
1777
}
1778
1779
static int hoid_compare(const struct ceph_hobject_id *lhs,
1780
const struct ceph_hobject_id *rhs)
1781
{
1782
void *effective_key1, *effective_key2;
1783
size_t effective_key1_len, effective_key2_len;
1784
int ret;
1785
1786
if (lhs->is_max < rhs->is_max)
1787
return -1;
1788
if (lhs->is_max > rhs->is_max)
1789
return 1;
1790
1791
if (lhs->pool < rhs->pool)
1792
return -1;
1793
if (lhs->pool > rhs->pool)
1794
return 1;
1795
1796
if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs))
1797
return -1;
1798
if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs))
1799
return 1;
1800
1801
ret = compare_names(lhs->nspace, lhs->nspace_len,
1802
rhs->nspace, rhs->nspace_len);
1803
if (ret)
1804
return ret;
1805
1806
hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len);
1807
hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len);
1808
ret = compare_names(effective_key1, effective_key1_len,
1809
effective_key2, effective_key2_len);
1810
if (ret)
1811
return ret;
1812
1813
ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len);
1814
if (ret)
1815
return ret;
1816
1817
if (lhs->snapid < rhs->snapid)
1818
return -1;
1819
if (lhs->snapid > rhs->snapid)
1820
return 1;
1821
1822
return 0;
1823
}
1824
1825
/*
1826
* For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX
1827
* compat stuff here.
1828
*
1829
* Assumes @hoid is zero-initialized.
1830
*/
1831
static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid)
1832
{
1833
u8 struct_v;
1834
u32 struct_len;
1835
int ret;
1836
1837
ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v,
1838
&struct_len);
1839
if (ret)
1840
return ret;
1841
1842
if (struct_v < 4) {
1843
pr_err("got struct_v %d < 4 of hobject_t\n", struct_v);
1844
goto e_inval;
1845
}
1846
1847
hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len,
1848
GFP_NOIO);
1849
if (IS_ERR(hoid->key)) {
1850
ret = PTR_ERR(hoid->key);
1851
hoid->key = NULL;
1852
return ret;
1853
}
1854
1855
hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len,
1856
GFP_NOIO);
1857
if (IS_ERR(hoid->oid)) {
1858
ret = PTR_ERR(hoid->oid);
1859
hoid->oid = NULL;
1860
return ret;
1861
}
1862
1863
ceph_decode_64_safe(p, end, hoid->snapid, e_inval);
1864
ceph_decode_32_safe(p, end, hoid->hash, e_inval);
1865
ceph_decode_8_safe(p, end, hoid->is_max, e_inval);
1866
1867
hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len,
1868
GFP_NOIO);
1869
if (IS_ERR(hoid->nspace)) {
1870
ret = PTR_ERR(hoid->nspace);
1871
hoid->nspace = NULL;
1872
return ret;
1873
}
1874
1875
ceph_decode_64_safe(p, end, hoid->pool, e_inval);
1876
1877
ceph_hoid_build_hash_cache(hoid);
1878
return 0;
1879
1880
e_inval:
1881
return -EINVAL;
1882
}
1883
1884
static int hoid_encoding_size(const struct ceph_hobject_id *hoid)
1885
{
1886
return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */
1887
4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len;
1888
}
1889
1890
static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid)
1891
{
1892
ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid));
1893
ceph_encode_string(p, end, hoid->key, hoid->key_len);
1894
ceph_encode_string(p, end, hoid->oid, hoid->oid_len);
1895
ceph_encode_64(p, hoid->snapid);
1896
ceph_encode_32(p, hoid->hash);
1897
ceph_encode_8(p, hoid->is_max);
1898
ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len);
1899
ceph_encode_64(p, hoid->pool);
1900
}
1901
1902
static void free_hoid(struct ceph_hobject_id *hoid)
1903
{
1904
if (hoid) {
1905
kfree(hoid->key);
1906
kfree(hoid->oid);
1907
kfree(hoid->nspace);
1908
kfree(hoid);
1909
}
1910
}
1911
1912
static struct ceph_osd_backoff *alloc_backoff(void)
1913
{
1914
struct ceph_osd_backoff *backoff;
1915
1916
backoff = kzalloc(sizeof(*backoff), GFP_NOIO);
1917
if (!backoff)
1918
return NULL;
1919
1920
RB_CLEAR_NODE(&backoff->spg_node);
1921
RB_CLEAR_NODE(&backoff->id_node);
1922
return backoff;
1923
}
1924
1925
static void free_backoff(struct ceph_osd_backoff *backoff)
1926
{
1927
WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node));
1928
WARN_ON(!RB_EMPTY_NODE(&backoff->id_node));
1929
1930
free_hoid(backoff->begin);
1931
free_hoid(backoff->end);
1932
kfree(backoff);
1933
}
1934
1935
/*
1936
* Within a specific spgid, backoffs are managed by ->begin hoid.
1937
*/
1938
DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare,
1939
RB_BYVAL, spg_node);
1940
1941
static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root,
1942
const struct ceph_hobject_id *hoid)
1943
{
1944
struct rb_node *n = root->rb_node;
1945
1946
while (n) {
1947
struct ceph_osd_backoff *cur =
1948
rb_entry(n, struct ceph_osd_backoff, spg_node);
1949
int cmp;
1950
1951
cmp = hoid_compare(hoid, cur->begin);
1952
if (cmp < 0) {
1953
n = n->rb_left;
1954
} else if (cmp > 0) {
1955
if (hoid_compare(hoid, cur->end) < 0)
1956
return cur;
1957
1958
n = n->rb_right;
1959
} else {
1960
return cur;
1961
}
1962
}
1963
1964
return NULL;
1965
}
1966
1967
/*
1968
* Each backoff has a unique id within its OSD session.
1969
*/
1970
DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node)
1971
1972
static void clear_backoffs(struct ceph_osd *osd)
1973
{
1974
while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) {
1975
struct ceph_spg_mapping *spg =
1976
rb_entry(rb_first(&osd->o_backoff_mappings),
1977
struct ceph_spg_mapping, node);
1978
1979
while (!RB_EMPTY_ROOT(&spg->backoffs)) {
1980
struct ceph_osd_backoff *backoff =
1981
rb_entry(rb_first(&spg->backoffs),
1982
struct ceph_osd_backoff, spg_node);
1983
1984
erase_backoff(&spg->backoffs, backoff);
1985
erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
1986
free_backoff(backoff);
1987
}
1988
erase_spg_mapping(&osd->o_backoff_mappings, spg);
1989
free_spg_mapping(spg);
1990
}
1991
}
1992
1993
/*
1994
* Set up a temporary, non-owning view into @t.
1995
*/
1996
static void hoid_fill_from_target(struct ceph_hobject_id *hoid,
1997
const struct ceph_osd_request_target *t)
1998
{
1999
hoid->key = NULL;
2000
hoid->key_len = 0;
2001
hoid->oid = t->target_oid.name;
2002
hoid->oid_len = t->target_oid.name_len;
2003
hoid->snapid = CEPH_NOSNAP;
2004
hoid->hash = t->pgid.seed;
2005
hoid->is_max = false;
2006
if (t->target_oloc.pool_ns) {
2007
hoid->nspace = t->target_oloc.pool_ns->str;
2008
hoid->nspace_len = t->target_oloc.pool_ns->len;
2009
} else {
2010
hoid->nspace = NULL;
2011
hoid->nspace_len = 0;
2012
}
2013
hoid->pool = t->target_oloc.pool;
2014
ceph_hoid_build_hash_cache(hoid);
2015
}
2016
2017
static bool should_plug_request(struct ceph_osd_request *req)
2018
{
2019
struct ceph_osd *osd = req->r_osd;
2020
struct ceph_spg_mapping *spg;
2021
struct ceph_osd_backoff *backoff;
2022
struct ceph_hobject_id hoid;
2023
2024
spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid);
2025
if (!spg)
2026
return false;
2027
2028
hoid_fill_from_target(&hoid, &req->r_t);
2029
backoff = lookup_containing_backoff(&spg->backoffs, &hoid);
2030
if (!backoff)
2031
return false;
2032
2033
dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n",
2034
__func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool,
2035
backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id);
2036
return true;
2037
}
2038
2039
/*
2040
* Keep get_num_data_items() in sync with this function.
2041
*/
2042
static void setup_request_data(struct ceph_osd_request *req)
2043
{
2044
struct ceph_msg *request_msg = req->r_request;
2045
struct ceph_msg *reply_msg = req->r_reply;
2046
struct ceph_osd_req_op *op;
2047
2048
if (req->r_request->num_data_items || req->r_reply->num_data_items)
2049
return;
2050
2051
WARN_ON(request_msg->data_length || reply_msg->data_length);
2052
for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
2053
switch (op->op) {
2054
/* request */
2055
case CEPH_OSD_OP_WRITE:
2056
case CEPH_OSD_OP_WRITEFULL:
2057
WARN_ON(op->indata_len != op->extent.length);
2058
ceph_osdc_msg_data_add(request_msg,
2059
&op->extent.osd_data);
2060
break;
2061
case CEPH_OSD_OP_SETXATTR:
2062
case CEPH_OSD_OP_CMPXATTR:
2063
WARN_ON(op->indata_len != op->xattr.name_len +
2064
op->xattr.value_len);
2065
ceph_osdc_msg_data_add(request_msg,
2066
&op->xattr.osd_data);
2067
break;
2068
case CEPH_OSD_OP_NOTIFY_ACK:
2069
ceph_osdc_msg_data_add(request_msg,
2070
&op->notify_ack.request_data);
2071
break;
2072
case CEPH_OSD_OP_COPY_FROM2:
2073
ceph_osdc_msg_data_add(request_msg,
2074
&op->copy_from.osd_data);
2075
break;
2076
2077
/* reply */
2078
case CEPH_OSD_OP_STAT:
2079
ceph_osdc_msg_data_add(reply_msg,
2080
&op->raw_data_in);
2081
break;
2082
case CEPH_OSD_OP_READ:
2083
case CEPH_OSD_OP_SPARSE_READ:
2084
ceph_osdc_msg_data_add(reply_msg,
2085
&op->extent.osd_data);
2086
break;
2087
case CEPH_OSD_OP_LIST_WATCHERS:
2088
ceph_osdc_msg_data_add(reply_msg,
2089
&op->list_watchers.response_data);
2090
break;
2091
2092
/* both */
2093
case CEPH_OSD_OP_CALL:
2094
WARN_ON(op->indata_len != op->cls.class_len +
2095
op->cls.method_len +
2096
op->cls.indata_len);
2097
ceph_osdc_msg_data_add(request_msg,
2098
&op->cls.request_info);
2099
/* optional, can be NONE */
2100
ceph_osdc_msg_data_add(request_msg,
2101
&op->cls.request_data);
2102
/* optional, can be NONE */
2103
ceph_osdc_msg_data_add(reply_msg,
2104
&op->cls.response_data);
2105
break;
2106
case CEPH_OSD_OP_NOTIFY:
2107
ceph_osdc_msg_data_add(request_msg,
2108
&op->notify.request_data);
2109
ceph_osdc_msg_data_add(reply_msg,
2110
&op->notify.response_data);
2111
break;
2112
}
2113
}
2114
}
2115
2116
static void encode_pgid(void **p, const struct ceph_pg *pgid)
2117
{
2118
ceph_encode_8(p, 1);
2119
ceph_encode_64(p, pgid->pool);
2120
ceph_encode_32(p, pgid->seed);
2121
ceph_encode_32(p, -1); /* preferred */
2122
}
2123
2124
static void encode_spgid(void **p, const struct ceph_spg *spgid)
2125
{
2126
ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1);
2127
encode_pgid(p, &spgid->pgid);
2128
ceph_encode_8(p, spgid->shard);
2129
}
2130
2131
static void encode_oloc(void **p, void *end,
2132
const struct ceph_object_locator *oloc)
2133
{
2134
ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc));
2135
ceph_encode_64(p, oloc->pool);
2136
ceph_encode_32(p, -1); /* preferred */
2137
ceph_encode_32(p, 0); /* key len */
2138
if (oloc->pool_ns)
2139
ceph_encode_string(p, end, oloc->pool_ns->str,
2140
oloc->pool_ns->len);
2141
else
2142
ceph_encode_32(p, 0);
2143
}
2144
2145
static void encode_request_partial(struct ceph_osd_request *req,
2146
struct ceph_msg *msg)
2147
{
2148
void *p = msg->front.iov_base;
2149
void *const end = p + msg->front_alloc_len;
2150
u32 data_len = 0;
2151
int i;
2152
2153
if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
2154
/* snapshots aren't writeable */
2155
WARN_ON(req->r_snapid != CEPH_NOSNAP);
2156
} else {
2157
WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
2158
req->r_data_offset || req->r_snapc);
2159
}
2160
2161
setup_request_data(req);
2162
2163
encode_spgid(&p, &req->r_t.spgid); /* actual spg */
2164
ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
2165
ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
2166
ceph_encode_32(&p, req->r_flags);
2167
2168
/* reqid */
2169
ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid));
2170
memset(p, 0, sizeof(struct ceph_osd_reqid));
2171
p += sizeof(struct ceph_osd_reqid);
2172
2173
/* trace */
2174
memset(p, 0, sizeof(struct ceph_blkin_trace_info));
2175
p += sizeof(struct ceph_blkin_trace_info);
2176
2177
ceph_encode_32(&p, 0); /* client_inc, always 0 */
2178
ceph_encode_timespec64(p, &req->r_mtime);
2179
p += sizeof(struct ceph_timespec);
2180
2181
encode_oloc(&p, end, &req->r_t.target_oloc);
2182
ceph_encode_string(&p, end, req->r_t.target_oid.name,
2183
req->r_t.target_oid.name_len);
2184
2185
/* ops, can imply data */
2186
ceph_encode_16(&p, req->r_num_ops);
2187
for (i = 0; i < req->r_num_ops; i++) {
2188
data_len += osd_req_encode_op(p, &req->r_ops[i]);
2189
p += sizeof(struct ceph_osd_op);
2190
}
2191
2192
ceph_encode_64(&p, req->r_snapid); /* snapid */
2193
if (req->r_snapc) {
2194
ceph_encode_64(&p, req->r_snapc->seq);
2195
ceph_encode_32(&p, req->r_snapc->num_snaps);
2196
for (i = 0; i < req->r_snapc->num_snaps; i++)
2197
ceph_encode_64(&p, req->r_snapc->snaps[i]);
2198
} else {
2199
ceph_encode_64(&p, 0); /* snap_seq */
2200
ceph_encode_32(&p, 0); /* snaps len */
2201
}
2202
2203
ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
2204
BUG_ON(p > end - 8); /* space for features */
2205
2206
msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */
2207
/* front_len is finalized in encode_request_finish() */
2208
msg->front.iov_len = p - msg->front.iov_base;
2209
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2210
msg->hdr.data_len = cpu_to_le32(data_len);
2211
/*
2212
* The header "data_off" is a hint to the receiver allowing it
2213
* to align received data into its buffers such that there's no
2214
* need to re-copy it before writing it to disk (direct I/O).
2215
*/
2216
msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
2217
2218
dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg,
2219
req->r_t.target_oid.name, req->r_t.target_oid.name_len);
2220
}
2221
2222
static void encode_request_finish(struct ceph_msg *msg)
2223
{
2224
void *p = msg->front.iov_base;
2225
void *const partial_end = p + msg->front.iov_len;
2226
void *const end = p + msg->front_alloc_len;
2227
2228
if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) {
2229
/* luminous OSD -- encode features and be done */
2230
p = partial_end;
2231
ceph_encode_64(&p, msg->con->peer_features);
2232
} else {
2233
struct {
2234
char spgid[CEPH_ENCODING_START_BLK_LEN +
2235
CEPH_PGID_ENCODING_LEN + 1];
2236
__le32 hash;
2237
__le32 epoch;
2238
__le32 flags;
2239
char reqid[CEPH_ENCODING_START_BLK_LEN +
2240
sizeof(struct ceph_osd_reqid)];
2241
char trace[sizeof(struct ceph_blkin_trace_info)];
2242
__le32 client_inc;
2243
struct ceph_timespec mtime;
2244
} __packed head;
2245
struct ceph_pg pgid;
2246
void *oloc, *oid, *tail;
2247
int oloc_len, oid_len, tail_len;
2248
int len;
2249
2250
/*
2251
* Pre-luminous OSD -- reencode v8 into v4 using @head
2252
* as a temporary buffer. Encode the raw PG; the rest
2253
* is just a matter of moving oloc, oid and tail blobs
2254
* around.
2255
*/
2256
memcpy(&head, p, sizeof(head));
2257
p += sizeof(head);
2258
2259
oloc = p;
2260
p += CEPH_ENCODING_START_BLK_LEN;
2261
pgid.pool = ceph_decode_64(&p);
2262
p += 4 + 4; /* preferred, key len */
2263
len = ceph_decode_32(&p);
2264
p += len; /* nspace */
2265
oloc_len = p - oloc;
2266
2267
oid = p;
2268
len = ceph_decode_32(&p);
2269
p += len;
2270
oid_len = p - oid;
2271
2272
tail = p;
2273
tail_len = partial_end - p;
2274
2275
p = msg->front.iov_base;
2276
ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc));
2277
ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch));
2278
ceph_encode_copy(&p, &head.flags, sizeof(head.flags));
2279
ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime));
2280
2281
/* reassert_version */
2282
memset(p, 0, sizeof(struct ceph_eversion));
2283
p += sizeof(struct ceph_eversion);
2284
2285
BUG_ON(p >= oloc);
2286
memmove(p, oloc, oloc_len);
2287
p += oloc_len;
2288
2289
pgid.seed = le32_to_cpu(head.hash);
2290
encode_pgid(&p, &pgid); /* raw pg */
2291
2292
BUG_ON(p >= oid);
2293
memmove(p, oid, oid_len);
2294
p += oid_len;
2295
2296
/* tail -- ops, snapid, snapc, retry_attempt */
2297
BUG_ON(p >= tail);
2298
memmove(p, tail, tail_len);
2299
p += tail_len;
2300
2301
msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
2302
}
2303
2304
BUG_ON(p > end);
2305
msg->front.iov_len = p - msg->front.iov_base;
2306
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2307
2308
dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg,
2309
le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len),
2310
le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len),
2311
le16_to_cpu(msg->hdr.version));
2312
}
2313
2314
/*
2315
* @req has to be assigned a tid and registered.
2316
*/
2317
static void send_request(struct ceph_osd_request *req)
2318
{
2319
struct ceph_osd *osd = req->r_osd;
2320
2321
verify_osd_locked(osd);
2322
WARN_ON(osd->o_osd != req->r_t.osd);
2323
2324
/* backoff? */
2325
if (should_plug_request(req))
2326
return;
2327
2328
/*
2329
* We may have a previously queued request message hanging
2330
* around. Cancel it to avoid corrupting the msgr.
2331
*/
2332
if (req->r_sent)
2333
ceph_msg_revoke(req->r_request);
2334
2335
req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
2336
if (req->r_attempts)
2337
req->r_flags |= CEPH_OSD_FLAG_RETRY;
2338
else
2339
WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
2340
2341
encode_request_partial(req, req->r_request);
2342
2343
dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n",
2344
__func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
2345
req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed,
2346
req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags,
2347
req->r_attempts);
2348
2349
req->r_t.paused = false;
2350
req->r_stamp = jiffies;
2351
req->r_attempts++;
2352
2353
req->r_sent = osd->o_incarnation;
2354
req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
2355
ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
2356
}
2357
2358
static void maybe_request_map(struct ceph_osd_client *osdc)
2359
{
2360
bool continuous = false;
2361
2362
verify_osdc_locked(osdc);
2363
WARN_ON(!osdc->osdmap->epoch);
2364
2365
if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2366
ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) ||
2367
ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2368
dout("%s osdc %p continuous\n", __func__, osdc);
2369
continuous = true;
2370
} else {
2371
dout("%s osdc %p onetime\n", __func__, osdc);
2372
}
2373
2374
if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
2375
osdc->osdmap->epoch + 1, continuous))
2376
ceph_monc_renew_subs(&osdc->client->monc);
2377
}
2378
2379
static void complete_request(struct ceph_osd_request *req, int err);
2380
static void send_map_check(struct ceph_osd_request *req);
2381
2382
static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
2383
{
2384
struct ceph_osd_client *osdc = req->r_osdc;
2385
struct ceph_osd *osd;
2386
enum calc_target_result ct_res;
2387
int err = 0;
2388
bool need_send = false;
2389
bool promoted = false;
2390
2391
WARN_ON(req->r_tid);
2392
dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
2393
2394
again:
2395
ct_res = calc_target(osdc, &req->r_t, false);
2396
if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
2397
goto promote;
2398
2399
osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
2400
if (IS_ERR(osd)) {
2401
WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
2402
goto promote;
2403
}
2404
2405
if (osdc->abort_err) {
2406
dout("req %p abort_err %d\n", req, osdc->abort_err);
2407
err = osdc->abort_err;
2408
} else if (osdc->osdmap->epoch < osdc->epoch_barrier) {
2409
dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
2410
osdc->epoch_barrier);
2411
req->r_t.paused = true;
2412
maybe_request_map(osdc);
2413
} else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2414
ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2415
dout("req %p pausewr\n", req);
2416
req->r_t.paused = true;
2417
maybe_request_map(osdc);
2418
} else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
2419
ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
2420
dout("req %p pauserd\n", req);
2421
req->r_t.paused = true;
2422
maybe_request_map(osdc);
2423
} else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2424
!(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
2425
CEPH_OSD_FLAG_FULL_FORCE)) &&
2426
(ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2427
pool_full(osdc, req->r_t.base_oloc.pool))) {
2428
dout("req %p full/pool_full\n", req);
2429
if (ceph_test_opt(osdc->client, ABORT_ON_FULL)) {
2430
err = -ENOSPC;
2431
} else {
2432
if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL))
2433
pr_warn_ratelimited("cluster is full (osdmap FULL)\n");
2434
else
2435
pr_warn_ratelimited("pool %lld is full or reached quota\n",
2436
req->r_t.base_oloc.pool);
2437
req->r_t.paused = true;
2438
maybe_request_map(osdc);
2439
}
2440
} else if (!osd_homeless(osd)) {
2441
need_send = true;
2442
} else {
2443
maybe_request_map(osdc);
2444
}
2445
2446
mutex_lock(&osd->lock);
2447
/*
2448
* Assign the tid atomically with send_request() to protect
2449
* multiple writes to the same object from racing with each
2450
* other, resulting in out of order ops on the OSDs.
2451
*/
2452
req->r_tid = atomic64_inc_return(&osdc->last_tid);
2453
link_request(osd, req);
2454
if (need_send)
2455
send_request(req);
2456
else if (err)
2457
complete_request(req, err);
2458
mutex_unlock(&osd->lock);
2459
2460
if (!err && ct_res == CALC_TARGET_POOL_DNE)
2461
send_map_check(req);
2462
2463
if (promoted)
2464
downgrade_write(&osdc->lock);
2465
return;
2466
2467
promote:
2468
up_read(&osdc->lock);
2469
down_write(&osdc->lock);
2470
wrlocked = true;
2471
promoted = true;
2472
goto again;
2473
}
2474
2475
static void account_request(struct ceph_osd_request *req)
2476
{
2477
WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK));
2478
WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)));
2479
2480
req->r_flags |= CEPH_OSD_FLAG_ONDISK;
2481
atomic_inc(&req->r_osdc->num_requests);
2482
2483
req->r_start_stamp = jiffies;
2484
req->r_start_latency = ktime_get();
2485
}
2486
2487
static void submit_request(struct ceph_osd_request *req, bool wrlocked)
2488
{
2489
ceph_osdc_get_request(req);
2490
account_request(req);
2491
__submit_request(req, wrlocked);
2492
}
2493
2494
static void finish_request(struct ceph_osd_request *req)
2495
{
2496
struct ceph_osd_client *osdc = req->r_osdc;
2497
2498
WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
2499
dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2500
2501
req->r_end_latency = ktime_get();
2502
2503
if (req->r_osd) {
2504
ceph_init_sparse_read(&req->r_osd->o_sparse_read);
2505
unlink_request(req->r_osd, req);
2506
}
2507
atomic_dec(&osdc->num_requests);
2508
2509
/*
2510
* If an OSD has failed or returned and a request has been sent
2511
* twice, it's possible to get a reply and end up here while the
2512
* request message is queued for delivery. We will ignore the
2513
* reply, so not a big deal, but better to try and catch it.
2514
*/
2515
ceph_msg_revoke(req->r_request);
2516
ceph_msg_revoke_incoming(req->r_reply);
2517
}
2518
2519
static void __complete_request(struct ceph_osd_request *req)
2520
{
2521
dout("%s req %p tid %llu cb %ps result %d\n", __func__, req,
2522
req->r_tid, req->r_callback, req->r_result);
2523
2524
if (req->r_callback)
2525
req->r_callback(req);
2526
complete_all(&req->r_completion);
2527
ceph_osdc_put_request(req);
2528
}
2529
2530
static void complete_request_workfn(struct work_struct *work)
2531
{
2532
struct ceph_osd_request *req =
2533
container_of(work, struct ceph_osd_request, r_complete_work);
2534
2535
__complete_request(req);
2536
}
2537
2538
/*
2539
* This is open-coded in handle_reply().
2540
*/
2541
static void complete_request(struct ceph_osd_request *req, int err)
2542
{
2543
dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2544
2545
req->r_result = err;
2546
finish_request(req);
2547
2548
INIT_WORK(&req->r_complete_work, complete_request_workfn);
2549
queue_work(req->r_osdc->completion_wq, &req->r_complete_work);
2550
}
2551
2552
static void cancel_map_check(struct ceph_osd_request *req)
2553
{
2554
struct ceph_osd_client *osdc = req->r_osdc;
2555
struct ceph_osd_request *lookup_req;
2556
2557
verify_osdc_wrlocked(osdc);
2558
2559
lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2560
if (!lookup_req)
2561
return;
2562
2563
WARN_ON(lookup_req != req);
2564
erase_request_mc(&osdc->map_checks, req);
2565
ceph_osdc_put_request(req);
2566
}
2567
2568
static void cancel_request(struct ceph_osd_request *req)
2569
{
2570
dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2571
2572
cancel_map_check(req);
2573
finish_request(req);
2574
complete_all(&req->r_completion);
2575
ceph_osdc_put_request(req);
2576
}
2577
2578
static void abort_request(struct ceph_osd_request *req, int err)
2579
{
2580
dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2581
2582
cancel_map_check(req);
2583
complete_request(req, err);
2584
}
2585
2586
static int abort_fn(struct ceph_osd_request *req, void *arg)
2587
{
2588
int err = *(int *)arg;
2589
2590
abort_request(req, err);
2591
return 0; /* continue iteration */
2592
}
2593
2594
/*
2595
* Abort all in-flight requests with @err and arrange for all future
2596
* requests to be failed immediately.
2597
*/
2598
void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err)
2599
{
2600
dout("%s osdc %p err %d\n", __func__, osdc, err);
2601
down_write(&osdc->lock);
2602
for_each_request(osdc, abort_fn, &err);
2603
osdc->abort_err = err;
2604
up_write(&osdc->lock);
2605
}
2606
EXPORT_SYMBOL(ceph_osdc_abort_requests);
2607
2608
void ceph_osdc_clear_abort_err(struct ceph_osd_client *osdc)
2609
{
2610
down_write(&osdc->lock);
2611
osdc->abort_err = 0;
2612
up_write(&osdc->lock);
2613
}
2614
EXPORT_SYMBOL(ceph_osdc_clear_abort_err);
2615
2616
static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2617
{
2618
if (likely(eb > osdc->epoch_barrier)) {
2619
dout("updating epoch_barrier from %u to %u\n",
2620
osdc->epoch_barrier, eb);
2621
osdc->epoch_barrier = eb;
2622
/* Request map if we're not to the barrier yet */
2623
if (eb > osdc->osdmap->epoch)
2624
maybe_request_map(osdc);
2625
}
2626
}
2627
2628
void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2629
{
2630
down_read(&osdc->lock);
2631
if (unlikely(eb > osdc->epoch_barrier)) {
2632
up_read(&osdc->lock);
2633
down_write(&osdc->lock);
2634
update_epoch_barrier(osdc, eb);
2635
up_write(&osdc->lock);
2636
} else {
2637
up_read(&osdc->lock);
2638
}
2639
}
2640
EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
2641
2642
/*
2643
* We can end up releasing caps as a result of abort_request().
2644
* In that case, we probably want to ensure that the cap release message
2645
* has an updated epoch barrier in it, so set the epoch barrier prior to
2646
* aborting the first request.
2647
*/
2648
static int abort_on_full_fn(struct ceph_osd_request *req, void *arg)
2649
{
2650
struct ceph_osd_client *osdc = req->r_osdc;
2651
bool *victims = arg;
2652
2653
if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2654
(ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2655
pool_full(osdc, req->r_t.base_oloc.pool))) {
2656
if (!*victims) {
2657
update_epoch_barrier(osdc, osdc->osdmap->epoch);
2658
*victims = true;
2659
}
2660
abort_request(req, -ENOSPC);
2661
}
2662
2663
return 0; /* continue iteration */
2664
}
2665
2666
/*
2667
* Drop all pending requests that are stalled waiting on a full condition to
2668
* clear, and complete them with ENOSPC as the return code. Set the
2669
* osdc->epoch_barrier to the latest map epoch that we've seen if any were
2670
* cancelled.
2671
*/
2672
static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
2673
{
2674
bool victims = false;
2675
2676
if (ceph_test_opt(osdc->client, ABORT_ON_FULL) &&
2677
(ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc)))
2678
for_each_request(osdc, abort_on_full_fn, &victims);
2679
}
2680
2681
static void check_pool_dne(struct ceph_osd_request *req)
2682
{
2683
struct ceph_osd_client *osdc = req->r_osdc;
2684
struct ceph_osdmap *map = osdc->osdmap;
2685
2686
verify_osdc_wrlocked(osdc);
2687
WARN_ON(!map->epoch);
2688
2689
if (req->r_attempts) {
2690
/*
2691
* We sent a request earlier, which means that
2692
* previously the pool existed, and now it does not
2693
* (i.e., it was deleted).
2694
*/
2695
req->r_map_dne_bound = map->epoch;
2696
dout("%s req %p tid %llu pool disappeared\n", __func__, req,
2697
req->r_tid);
2698
} else {
2699
dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__,
2700
req, req->r_tid, req->r_map_dne_bound, map->epoch);
2701
}
2702
2703
if (req->r_map_dne_bound) {
2704
if (map->epoch >= req->r_map_dne_bound) {
2705
/* we had a new enough map */
2706
pr_info_ratelimited("tid %llu pool does not exist\n",
2707
req->r_tid);
2708
complete_request(req, -ENOENT);
2709
}
2710
} else {
2711
send_map_check(req);
2712
}
2713
}
2714
2715
static void map_check_cb(struct ceph_mon_generic_request *greq)
2716
{
2717
struct ceph_osd_client *osdc = &greq->monc->client->osdc;
2718
struct ceph_osd_request *req;
2719
u64 tid = greq->private_data;
2720
2721
WARN_ON(greq->result || !greq->u.newest);
2722
2723
down_write(&osdc->lock);
2724
req = lookup_request_mc(&osdc->map_checks, tid);
2725
if (!req) {
2726
dout("%s tid %llu dne\n", __func__, tid);
2727
goto out_unlock;
2728
}
2729
2730
dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__,
2731
req, req->r_tid, req->r_map_dne_bound, greq->u.newest);
2732
if (!req->r_map_dne_bound)
2733
req->r_map_dne_bound = greq->u.newest;
2734
erase_request_mc(&osdc->map_checks, req);
2735
check_pool_dne(req);
2736
2737
ceph_osdc_put_request(req);
2738
out_unlock:
2739
up_write(&osdc->lock);
2740
}
2741
2742
static void send_map_check(struct ceph_osd_request *req)
2743
{
2744
struct ceph_osd_client *osdc = req->r_osdc;
2745
struct ceph_osd_request *lookup_req;
2746
int ret;
2747
2748
verify_osdc_wrlocked(osdc);
2749
2750
lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2751
if (lookup_req) {
2752
WARN_ON(lookup_req != req);
2753
return;
2754
}
2755
2756
ceph_osdc_get_request(req);
2757
insert_request_mc(&osdc->map_checks, req);
2758
ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
2759
map_check_cb, req->r_tid);
2760
WARN_ON(ret);
2761
}
2762
2763
/*
2764
* lingering requests, watch/notify v2 infrastructure
2765
*/
2766
static void linger_release(struct kref *kref)
2767
{
2768
struct ceph_osd_linger_request *lreq =
2769
container_of(kref, struct ceph_osd_linger_request, kref);
2770
2771
dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq,
2772
lreq->reg_req, lreq->ping_req);
2773
WARN_ON(!RB_EMPTY_NODE(&lreq->node));
2774
WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
2775
WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node));
2776
WARN_ON(!list_empty(&lreq->scan_item));
2777
WARN_ON(!list_empty(&lreq->pending_lworks));
2778
WARN_ON(lreq->osd);
2779
2780
if (lreq->request_pl)
2781
ceph_pagelist_release(lreq->request_pl);
2782
if (lreq->notify_id_pages)
2783
ceph_release_page_vector(lreq->notify_id_pages, 1);
2784
2785
ceph_osdc_put_request(lreq->reg_req);
2786
ceph_osdc_put_request(lreq->ping_req);
2787
target_destroy(&lreq->t);
2788
kfree(lreq);
2789
}
2790
2791
static void linger_put(struct ceph_osd_linger_request *lreq)
2792
{
2793
if (lreq)
2794
kref_put(&lreq->kref, linger_release);
2795
}
2796
2797
static struct ceph_osd_linger_request *
2798
linger_get(struct ceph_osd_linger_request *lreq)
2799
{
2800
kref_get(&lreq->kref);
2801
return lreq;
2802
}
2803
2804
static struct ceph_osd_linger_request *
2805
linger_alloc(struct ceph_osd_client *osdc)
2806
{
2807
struct ceph_osd_linger_request *lreq;
2808
2809
lreq = kzalloc(sizeof(*lreq), GFP_NOIO);
2810
if (!lreq)
2811
return NULL;
2812
2813
kref_init(&lreq->kref);
2814
mutex_init(&lreq->lock);
2815
RB_CLEAR_NODE(&lreq->node);
2816
RB_CLEAR_NODE(&lreq->osdc_node);
2817
RB_CLEAR_NODE(&lreq->mc_node);
2818
INIT_LIST_HEAD(&lreq->scan_item);
2819
INIT_LIST_HEAD(&lreq->pending_lworks);
2820
init_completion(&lreq->reg_commit_wait);
2821
init_completion(&lreq->notify_finish_wait);
2822
2823
lreq->osdc = osdc;
2824
target_init(&lreq->t);
2825
2826
dout("%s lreq %p\n", __func__, lreq);
2827
return lreq;
2828
}
2829
2830
DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
2831
DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
2832
DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node)
2833
2834
/*
2835
* Create linger request <-> OSD session relation.
2836
*
2837
* @lreq has to be registered, @osd may be homeless.
2838
*/
2839
static void link_linger(struct ceph_osd *osd,
2840
struct ceph_osd_linger_request *lreq)
2841
{
2842
verify_osd_locked(osd);
2843
WARN_ON(!lreq->linger_id || lreq->osd);
2844
dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2845
osd->o_osd, lreq, lreq->linger_id);
2846
2847
if (!osd_homeless(osd))
2848
__remove_osd_from_lru(osd);
2849
else
2850
atomic_inc(&osd->o_osdc->num_homeless);
2851
2852
get_osd(osd);
2853
insert_linger(&osd->o_linger_requests, lreq);
2854
lreq->osd = osd;
2855
}
2856
2857
static void unlink_linger(struct ceph_osd *osd,
2858
struct ceph_osd_linger_request *lreq)
2859
{
2860
verify_osd_locked(osd);
2861
WARN_ON(lreq->osd != osd);
2862
dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2863
osd->o_osd, lreq, lreq->linger_id);
2864
2865
lreq->osd = NULL;
2866
erase_linger(&osd->o_linger_requests, lreq);
2867
put_osd(osd);
2868
2869
if (!osd_homeless(osd))
2870
maybe_move_osd_to_lru(osd);
2871
else
2872
atomic_dec(&osd->o_osdc->num_homeless);
2873
}
2874
2875
static bool __linger_registered(struct ceph_osd_linger_request *lreq)
2876
{
2877
verify_osdc_locked(lreq->osdc);
2878
2879
return !RB_EMPTY_NODE(&lreq->osdc_node);
2880
}
2881
2882
static bool linger_registered(struct ceph_osd_linger_request *lreq)
2883
{
2884
struct ceph_osd_client *osdc = lreq->osdc;
2885
bool registered;
2886
2887
down_read(&osdc->lock);
2888
registered = __linger_registered(lreq);
2889
up_read(&osdc->lock);
2890
2891
return registered;
2892
}
2893
2894
static void linger_register(struct ceph_osd_linger_request *lreq)
2895
{
2896
struct ceph_osd_client *osdc = lreq->osdc;
2897
2898
verify_osdc_wrlocked(osdc);
2899
WARN_ON(lreq->linger_id);
2900
2901
linger_get(lreq);
2902
lreq->linger_id = ++osdc->last_linger_id;
2903
insert_linger_osdc(&osdc->linger_requests, lreq);
2904
}
2905
2906
static void linger_unregister(struct ceph_osd_linger_request *lreq)
2907
{
2908
struct ceph_osd_client *osdc = lreq->osdc;
2909
2910
verify_osdc_wrlocked(osdc);
2911
2912
erase_linger_osdc(&osdc->linger_requests, lreq);
2913
linger_put(lreq);
2914
}
2915
2916
static void cancel_linger_request(struct ceph_osd_request *req)
2917
{
2918
struct ceph_osd_linger_request *lreq = req->r_priv;
2919
2920
WARN_ON(!req->r_linger);
2921
cancel_request(req);
2922
linger_put(lreq);
2923
}
2924
2925
struct linger_work {
2926
struct work_struct work;
2927
struct ceph_osd_linger_request *lreq;
2928
struct list_head pending_item;
2929
unsigned long queued_stamp;
2930
2931
union {
2932
struct {
2933
u64 notify_id;
2934
u64 notifier_id;
2935
void *payload; /* points into @msg front */
2936
size_t payload_len;
2937
2938
struct ceph_msg *msg; /* for ceph_msg_put() */
2939
} notify;
2940
struct {
2941
int err;
2942
} error;
2943
};
2944
};
2945
2946
static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
2947
work_func_t workfn)
2948
{
2949
struct linger_work *lwork;
2950
2951
lwork = kzalloc(sizeof(*lwork), GFP_NOIO);
2952
if (!lwork)
2953
return NULL;
2954
2955
INIT_WORK(&lwork->work, workfn);
2956
INIT_LIST_HEAD(&lwork->pending_item);
2957
lwork->lreq = linger_get(lreq);
2958
2959
return lwork;
2960
}
2961
2962
static void lwork_free(struct linger_work *lwork)
2963
{
2964
struct ceph_osd_linger_request *lreq = lwork->lreq;
2965
2966
mutex_lock(&lreq->lock);
2967
list_del(&lwork->pending_item);
2968
mutex_unlock(&lreq->lock);
2969
2970
linger_put(lreq);
2971
kfree(lwork);
2972
}
2973
2974
static void lwork_queue(struct linger_work *lwork)
2975
{
2976
struct ceph_osd_linger_request *lreq = lwork->lreq;
2977
struct ceph_osd_client *osdc = lreq->osdc;
2978
2979
verify_lreq_locked(lreq);
2980
WARN_ON(!list_empty(&lwork->pending_item));
2981
2982
lwork->queued_stamp = jiffies;
2983
list_add_tail(&lwork->pending_item, &lreq->pending_lworks);
2984
queue_work(osdc->notify_wq, &lwork->work);
2985
}
2986
2987
static void do_watch_notify(struct work_struct *w)
2988
{
2989
struct linger_work *lwork = container_of(w, struct linger_work, work);
2990
struct ceph_osd_linger_request *lreq = lwork->lreq;
2991
2992
if (!linger_registered(lreq)) {
2993
dout("%s lreq %p not registered\n", __func__, lreq);
2994
goto out;
2995
}
2996
2997
WARN_ON(!lreq->is_watch);
2998
dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
2999
__func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
3000
lwork->notify.payload_len);
3001
lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id,
3002
lwork->notify.notifier_id, lwork->notify.payload,
3003
lwork->notify.payload_len);
3004
3005
out:
3006
ceph_msg_put(lwork->notify.msg);
3007
lwork_free(lwork);
3008
}
3009
3010
static void do_watch_error(struct work_struct *w)
3011
{
3012
struct linger_work *lwork = container_of(w, struct linger_work, work);
3013
struct ceph_osd_linger_request *lreq = lwork->lreq;
3014
3015
if (!linger_registered(lreq)) {
3016
dout("%s lreq %p not registered\n", __func__, lreq);
3017
goto out;
3018
}
3019
3020
dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err);
3021
lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err);
3022
3023
out:
3024
lwork_free(lwork);
3025
}
3026
3027
static void queue_watch_error(struct ceph_osd_linger_request *lreq)
3028
{
3029
struct linger_work *lwork;
3030
3031
lwork = lwork_alloc(lreq, do_watch_error);
3032
if (!lwork) {
3033
pr_err("failed to allocate error-lwork\n");
3034
return;
3035
}
3036
3037
lwork->error.err = lreq->last_error;
3038
lwork_queue(lwork);
3039
}
3040
3041
static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq,
3042
int result)
3043
{
3044
if (!completion_done(&lreq->reg_commit_wait)) {
3045
lreq->reg_commit_error = (result <= 0 ? result : 0);
3046
complete_all(&lreq->reg_commit_wait);
3047
}
3048
}
3049
3050
static void linger_commit_cb(struct ceph_osd_request *req)
3051
{
3052
struct ceph_osd_linger_request *lreq = req->r_priv;
3053
3054
mutex_lock(&lreq->lock);
3055
if (req != lreq->reg_req) {
3056
dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
3057
__func__, lreq, lreq->linger_id, req, lreq->reg_req);
3058
goto out;
3059
}
3060
3061
dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
3062
lreq->linger_id, req->r_result);
3063
linger_reg_commit_complete(lreq, req->r_result);
3064
lreq->committed = true;
3065
3066
if (!lreq->is_watch) {
3067
struct ceph_osd_data *osd_data =
3068
osd_req_op_data(req, 0, notify, response_data);
3069
void *p = page_address(osd_data->pages[0]);
3070
3071
WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
3072
osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
3073
3074
/* make note of the notify_id */
3075
if (req->r_ops[0].outdata_len >= sizeof(u64)) {
3076
lreq->notify_id = ceph_decode_64(&p);
3077
dout("lreq %p notify_id %llu\n", lreq,
3078
lreq->notify_id);
3079
} else {
3080
dout("lreq %p no notify_id\n", lreq);
3081
}
3082
}
3083
3084
out:
3085
mutex_unlock(&lreq->lock);
3086
linger_put(lreq);
3087
}
3088
3089
static int normalize_watch_error(int err)
3090
{
3091
/*
3092
* Translate ENOENT -> ENOTCONN so that a delete->disconnection
3093
* notification and a failure to reconnect because we raced with
3094
* the delete appear the same to the user.
3095
*/
3096
if (err == -ENOENT)
3097
err = -ENOTCONN;
3098
3099
return err;
3100
}
3101
3102
static void linger_reconnect_cb(struct ceph_osd_request *req)
3103
{
3104
struct ceph_osd_linger_request *lreq = req->r_priv;
3105
3106
mutex_lock(&lreq->lock);
3107
if (req != lreq->reg_req) {
3108
dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
3109
__func__, lreq, lreq->linger_id, req, lreq->reg_req);
3110
goto out;
3111
}
3112
3113
dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__,
3114
lreq, lreq->linger_id, req->r_result, lreq->last_error);
3115
if (req->r_result < 0) {
3116
if (!lreq->last_error) {
3117
lreq->last_error = normalize_watch_error(req->r_result);
3118
queue_watch_error(lreq);
3119
}
3120
}
3121
3122
out:
3123
mutex_unlock(&lreq->lock);
3124
linger_put(lreq);
3125
}
3126
3127
static void send_linger(struct ceph_osd_linger_request *lreq)
3128
{
3129
struct ceph_osd_client *osdc = lreq->osdc;
3130
struct ceph_osd_request *req;
3131
int ret;
3132
3133
verify_osdc_wrlocked(osdc);
3134
mutex_lock(&lreq->lock);
3135
dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3136
3137
if (lreq->reg_req) {
3138
if (lreq->reg_req->r_osd)
3139
cancel_linger_request(lreq->reg_req);
3140
ceph_osdc_put_request(lreq->reg_req);
3141
}
3142
3143
req = ceph_osdc_alloc_request(osdc, NULL, 1, true, GFP_NOIO);
3144
BUG_ON(!req);
3145
3146
target_copy(&req->r_t, &lreq->t);
3147
req->r_mtime = lreq->mtime;
3148
3149
if (lreq->is_watch && lreq->committed) {
3150
osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_RECONNECT,
3151
lreq->linger_id, ++lreq->register_gen);
3152
dout("lreq %p reconnect register_gen %u\n", lreq,
3153
req->r_ops[0].watch.gen);
3154
req->r_callback = linger_reconnect_cb;
3155
} else {
3156
if (lreq->is_watch) {
3157
osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_WATCH,
3158
lreq->linger_id, 0);
3159
} else {
3160
lreq->notify_id = 0;
3161
3162
refcount_inc(&lreq->request_pl->refcnt);
3163
osd_req_op_notify_init(req, 0, lreq->linger_id,
3164
lreq->request_pl);
3165
ceph_osd_data_pages_init(
3166
osd_req_op_data(req, 0, notify, response_data),
3167
lreq->notify_id_pages, PAGE_SIZE, 0, false, false);
3168
}
3169
dout("lreq %p register\n", lreq);
3170
req->r_callback = linger_commit_cb;
3171
}
3172
3173
ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3174
BUG_ON(ret);
3175
3176
req->r_priv = linger_get(lreq);
3177
req->r_linger = true;
3178
lreq->reg_req = req;
3179
mutex_unlock(&lreq->lock);
3180
3181
submit_request(req, true);
3182
}
3183
3184
static void linger_ping_cb(struct ceph_osd_request *req)
3185
{
3186
struct ceph_osd_linger_request *lreq = req->r_priv;
3187
3188
mutex_lock(&lreq->lock);
3189
if (req != lreq->ping_req) {
3190
dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
3191
__func__, lreq, lreq->linger_id, req, lreq->ping_req);
3192
goto out;
3193
}
3194
3195
dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n",
3196
__func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
3197
lreq->last_error);
3198
if (lreq->register_gen == req->r_ops[0].watch.gen) {
3199
if (!req->r_result) {
3200
lreq->watch_valid_thru = lreq->ping_sent;
3201
} else if (!lreq->last_error) {
3202
lreq->last_error = normalize_watch_error(req->r_result);
3203
queue_watch_error(lreq);
3204
}
3205
} else {
3206
dout("lreq %p register_gen %u ignoring old pong %u\n", lreq,
3207
lreq->register_gen, req->r_ops[0].watch.gen);
3208
}
3209
3210
out:
3211
mutex_unlock(&lreq->lock);
3212
linger_put(lreq);
3213
}
3214
3215
static void send_linger_ping(struct ceph_osd_linger_request *lreq)
3216
{
3217
struct ceph_osd_client *osdc = lreq->osdc;
3218
struct ceph_osd_request *req;
3219
int ret;
3220
3221
if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
3222
dout("%s PAUSERD\n", __func__);
3223
return;
3224
}
3225
3226
lreq->ping_sent = jiffies;
3227
dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n",
3228
__func__, lreq, lreq->linger_id, lreq->ping_sent,
3229
lreq->register_gen);
3230
3231
if (lreq->ping_req) {
3232
if (lreq->ping_req->r_osd)
3233
cancel_linger_request(lreq->ping_req);
3234
ceph_osdc_put_request(lreq->ping_req);
3235
}
3236
3237
req = ceph_osdc_alloc_request(osdc, NULL, 1, true, GFP_NOIO);
3238
BUG_ON(!req);
3239
3240
target_copy(&req->r_t, &lreq->t);
3241
osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_PING, lreq->linger_id,
3242
lreq->register_gen);
3243
req->r_callback = linger_ping_cb;
3244
3245
ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3246
BUG_ON(ret);
3247
3248
req->r_priv = linger_get(lreq);
3249
req->r_linger = true;
3250
lreq->ping_req = req;
3251
3252
ceph_osdc_get_request(req);
3253
account_request(req);
3254
req->r_tid = atomic64_inc_return(&osdc->last_tid);
3255
link_request(lreq->osd, req);
3256
send_request(req);
3257
}
3258
3259
static void linger_submit(struct ceph_osd_linger_request *lreq)
3260
{
3261
struct ceph_osd_client *osdc = lreq->osdc;
3262
struct ceph_osd *osd;
3263
3264
down_write(&osdc->lock);
3265
linger_register(lreq);
3266
3267
calc_target(osdc, &lreq->t, false);
3268
osd = lookup_create_osd(osdc, lreq->t.osd, true);
3269
link_linger(osd, lreq);
3270
3271
send_linger(lreq);
3272
up_write(&osdc->lock);
3273
}
3274
3275
static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
3276
{
3277
struct ceph_osd_client *osdc = lreq->osdc;
3278
struct ceph_osd_linger_request *lookup_lreq;
3279
3280
verify_osdc_wrlocked(osdc);
3281
3282
lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
3283
lreq->linger_id);
3284
if (!lookup_lreq)
3285
return;
3286
3287
WARN_ON(lookup_lreq != lreq);
3288
erase_linger_mc(&osdc->linger_map_checks, lreq);
3289
linger_put(lreq);
3290
}
3291
3292
/*
3293
* @lreq has to be both registered and linked.
3294
*/
3295
static void __linger_cancel(struct ceph_osd_linger_request *lreq)
3296
{
3297
if (lreq->ping_req && lreq->ping_req->r_osd)
3298
cancel_linger_request(lreq->ping_req);
3299
if (lreq->reg_req && lreq->reg_req->r_osd)
3300
cancel_linger_request(lreq->reg_req);
3301
cancel_linger_map_check(lreq);
3302
unlink_linger(lreq->osd, lreq);
3303
linger_unregister(lreq);
3304
}
3305
3306
static void linger_cancel(struct ceph_osd_linger_request *lreq)
3307
{
3308
struct ceph_osd_client *osdc = lreq->osdc;
3309
3310
down_write(&osdc->lock);
3311
if (__linger_registered(lreq))
3312
__linger_cancel(lreq);
3313
up_write(&osdc->lock);
3314
}
3315
3316
static void send_linger_map_check(struct ceph_osd_linger_request *lreq);
3317
3318
static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq)
3319
{
3320
struct ceph_osd_client *osdc = lreq->osdc;
3321
struct ceph_osdmap *map = osdc->osdmap;
3322
3323
verify_osdc_wrlocked(osdc);
3324
WARN_ON(!map->epoch);
3325
3326
if (lreq->register_gen) {
3327
lreq->map_dne_bound = map->epoch;
3328
dout("%s lreq %p linger_id %llu pool disappeared\n", __func__,
3329
lreq, lreq->linger_id);
3330
} else {
3331
dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n",
3332
__func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3333
map->epoch);
3334
}
3335
3336
if (lreq->map_dne_bound) {
3337
if (map->epoch >= lreq->map_dne_bound) {
3338
/* we had a new enough map */
3339
pr_info("linger_id %llu pool does not exist\n",
3340
lreq->linger_id);
3341
linger_reg_commit_complete(lreq, -ENOENT);
3342
__linger_cancel(lreq);
3343
}
3344
} else {
3345
send_linger_map_check(lreq);
3346
}
3347
}
3348
3349
static void linger_map_check_cb(struct ceph_mon_generic_request *greq)
3350
{
3351
struct ceph_osd_client *osdc = &greq->monc->client->osdc;
3352
struct ceph_osd_linger_request *lreq;
3353
u64 linger_id = greq->private_data;
3354
3355
WARN_ON(greq->result || !greq->u.newest);
3356
3357
down_write(&osdc->lock);
3358
lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id);
3359
if (!lreq) {
3360
dout("%s linger_id %llu dne\n", __func__, linger_id);
3361
goto out_unlock;
3362
}
3363
3364
dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n",
3365
__func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3366
greq->u.newest);
3367
if (!lreq->map_dne_bound)
3368
lreq->map_dne_bound = greq->u.newest;
3369
erase_linger_mc(&osdc->linger_map_checks, lreq);
3370
check_linger_pool_dne(lreq);
3371
3372
linger_put(lreq);
3373
out_unlock:
3374
up_write(&osdc->lock);
3375
}
3376
3377
static void send_linger_map_check(struct ceph_osd_linger_request *lreq)
3378
{
3379
struct ceph_osd_client *osdc = lreq->osdc;
3380
struct ceph_osd_linger_request *lookup_lreq;
3381
int ret;
3382
3383
verify_osdc_wrlocked(osdc);
3384
3385
lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
3386
lreq->linger_id);
3387
if (lookup_lreq) {
3388
WARN_ON(lookup_lreq != lreq);
3389
return;
3390
}
3391
3392
linger_get(lreq);
3393
insert_linger_mc(&osdc->linger_map_checks, lreq);
3394
ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
3395
linger_map_check_cb, lreq->linger_id);
3396
WARN_ON(ret);
3397
}
3398
3399
static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
3400
{
3401
int ret;
3402
3403
dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3404
ret = wait_for_completion_killable(&lreq->reg_commit_wait);
3405
return ret ?: lreq->reg_commit_error;
3406
}
3407
3408
static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq,
3409
unsigned long timeout)
3410
{
3411
long left;
3412
3413
dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3414
left = wait_for_completion_killable_timeout(&lreq->notify_finish_wait,
3415
ceph_timeout_jiffies(timeout));
3416
if (left <= 0)
3417
left = left ?: -ETIMEDOUT;
3418
else
3419
left = lreq->notify_finish_error; /* completed */
3420
3421
return left;
3422
}
3423
3424
/*
3425
* Timeout callback, called every N seconds. When 1 or more OSD
3426
* requests has been active for more than N seconds, we send a keepalive
3427
* (tag + timestamp) to its OSD to ensure any communications channel
3428
* reset is detected.
3429
*/
3430
static void handle_timeout(struct work_struct *work)
3431
{
3432
struct ceph_osd_client *osdc =
3433
container_of(work, struct ceph_osd_client, timeout_work.work);
3434
struct ceph_options *opts = osdc->client->options;
3435
unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
3436
unsigned long expiry_cutoff = jiffies - opts->osd_request_timeout;
3437
LIST_HEAD(slow_osds);
3438
struct rb_node *n, *p;
3439
3440
dout("%s osdc %p\n", __func__, osdc);
3441
down_write(&osdc->lock);
3442
3443
/*
3444
* ping osds that are a bit slow. this ensures that if there
3445
* is a break in the TCP connection we will notice, and reopen
3446
* a connection with that osd (from the fault callback).
3447
*/
3448
for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
3449
struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3450
bool found = false;
3451
3452
for (p = rb_first(&osd->o_requests); p; ) {
3453
struct ceph_osd_request *req =
3454
rb_entry(p, struct ceph_osd_request, r_node);
3455
3456
p = rb_next(p); /* abort_request() */
3457
3458
if (time_before(req->r_stamp, cutoff)) {
3459
dout(" req %p tid %llu on osd%d is laggy\n",
3460
req, req->r_tid, osd->o_osd);
3461
found = true;
3462
}
3463
if (opts->osd_request_timeout &&
3464
time_before(req->r_start_stamp, expiry_cutoff)) {
3465
pr_err_ratelimited("tid %llu on osd%d timeout\n",
3466
req->r_tid, osd->o_osd);
3467
abort_request(req, -ETIMEDOUT);
3468
}
3469
}
3470
for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
3471
struct ceph_osd_linger_request *lreq =
3472
rb_entry(p, struct ceph_osd_linger_request, node);
3473
3474
dout(" lreq %p linger_id %llu is served by osd%d\n",
3475
lreq, lreq->linger_id, osd->o_osd);
3476
found = true;
3477
3478
mutex_lock(&lreq->lock);
3479
if (lreq->is_watch && lreq->committed && !lreq->last_error)
3480
send_linger_ping(lreq);
3481
mutex_unlock(&lreq->lock);
3482
}
3483
3484
if (found)
3485
list_move_tail(&osd->o_keepalive_item, &slow_osds);
3486
}
3487
3488
if (opts->osd_request_timeout) {
3489
for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
3490
struct ceph_osd_request *req =
3491
rb_entry(p, struct ceph_osd_request, r_node);
3492
3493
p = rb_next(p); /* abort_request() */
3494
3495
if (time_before(req->r_start_stamp, expiry_cutoff)) {
3496
pr_err_ratelimited("tid %llu on osd%d timeout\n",
3497
req->r_tid, osdc->homeless_osd.o_osd);
3498
abort_request(req, -ETIMEDOUT);
3499
}
3500
}
3501
}
3502
3503
if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
3504
maybe_request_map(osdc);
3505
3506
while (!list_empty(&slow_osds)) {
3507
struct ceph_osd *osd = list_first_entry(&slow_osds,
3508
struct ceph_osd,
3509
o_keepalive_item);
3510
list_del_init(&osd->o_keepalive_item);
3511
ceph_con_keepalive(&osd->o_con);
3512
}
3513
3514
up_write(&osdc->lock);
3515
schedule_delayed_work(&osdc->timeout_work,
3516
osdc->client->options->osd_keepalive_timeout);
3517
}
3518
3519
static void handle_osds_timeout(struct work_struct *work)
3520
{
3521
struct ceph_osd_client *osdc =
3522
container_of(work, struct ceph_osd_client,
3523
osds_timeout_work.work);
3524
unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
3525
struct ceph_osd *osd, *nosd;
3526
3527
dout("%s osdc %p\n", __func__, osdc);
3528
down_write(&osdc->lock);
3529
list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
3530
if (time_before(jiffies, osd->lru_ttl))
3531
break;
3532
3533
WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
3534
WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
3535
close_osd(osd);
3536
}
3537
3538
up_write(&osdc->lock);
3539
schedule_delayed_work(&osdc->osds_timeout_work,
3540
round_jiffies_relative(delay));
3541
}
3542
3543
static int ceph_oloc_decode(void **p, void *end,
3544
struct ceph_object_locator *oloc)
3545
{
3546
u8 struct_v, struct_cv;
3547
u32 len;
3548
void *struct_end;
3549
int ret = 0;
3550
3551
ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3552
struct_v = ceph_decode_8(p);
3553
struct_cv = ceph_decode_8(p);
3554
if (struct_v < 3) {
3555
pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
3556
struct_v, struct_cv);
3557
goto e_inval;
3558
}
3559
if (struct_cv > 6) {
3560
pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
3561
struct_v, struct_cv);
3562
goto e_inval;
3563
}
3564
len = ceph_decode_32(p);
3565
ceph_decode_need(p, end, len, e_inval);
3566
struct_end = *p + len;
3567
3568
oloc->pool = ceph_decode_64(p);
3569
*p += 4; /* skip preferred */
3570
3571
len = ceph_decode_32(p);
3572
if (len > 0) {
3573
pr_warn("ceph_object_locator::key is set\n");
3574
goto e_inval;
3575
}
3576
3577
if (struct_v >= 5) {
3578
bool changed = false;
3579
3580
len = ceph_decode_32(p);
3581
if (len > 0) {
3582
ceph_decode_need(p, end, len, e_inval);
3583
if (!oloc->pool_ns ||
3584
ceph_compare_string(oloc->pool_ns, *p, len))
3585
changed = true;
3586
*p += len;
3587
} else {
3588
if (oloc->pool_ns)
3589
changed = true;
3590
}
3591
if (changed) {
3592
/* redirect changes namespace */
3593
pr_warn("ceph_object_locator::nspace is changed\n");
3594
goto e_inval;
3595
}
3596
}
3597
3598
if (struct_v >= 6) {
3599
s64 hash = ceph_decode_64(p);
3600
if (hash != -1) {
3601
pr_warn("ceph_object_locator::hash is set\n");
3602
goto e_inval;
3603
}
3604
}
3605
3606
/* skip the rest */
3607
*p = struct_end;
3608
out:
3609
return ret;
3610
3611
e_inval:
3612
ret = -EINVAL;
3613
goto out;
3614
}
3615
3616
static int ceph_redirect_decode(void **p, void *end,
3617
struct ceph_request_redirect *redir)
3618
{
3619
u8 struct_v, struct_cv;
3620
u32 len;
3621
void *struct_end;
3622
int ret;
3623
3624
ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3625
struct_v = ceph_decode_8(p);
3626
struct_cv = ceph_decode_8(p);
3627
if (struct_cv > 1) {
3628
pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
3629
struct_v, struct_cv);
3630
goto e_inval;
3631
}
3632
len = ceph_decode_32(p);
3633
ceph_decode_need(p, end, len, e_inval);
3634
struct_end = *p + len;
3635
3636
ret = ceph_oloc_decode(p, end, &redir->oloc);
3637
if (ret)
3638
goto out;
3639
3640
len = ceph_decode_32(p);
3641
if (len > 0) {
3642
pr_warn("ceph_request_redirect::object_name is set\n");
3643
goto e_inval;
3644
}
3645
3646
/* skip the rest */
3647
*p = struct_end;
3648
out:
3649
return ret;
3650
3651
e_inval:
3652
ret = -EINVAL;
3653
goto out;
3654
}
3655
3656
struct MOSDOpReply {
3657
struct ceph_pg pgid;
3658
u64 flags;
3659
int result;
3660
u32 epoch;
3661
int num_ops;
3662
u32 outdata_len[CEPH_OSD_MAX_OPS];
3663
s32 rval[CEPH_OSD_MAX_OPS];
3664
int retry_attempt;
3665
struct ceph_eversion replay_version;
3666
u64 user_version;
3667
struct ceph_request_redirect redirect;
3668
};
3669
3670
static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
3671
{
3672
void *p = msg->front.iov_base;
3673
void *const end = p + msg->front.iov_len;
3674
u16 version = le16_to_cpu(msg->hdr.version);
3675
struct ceph_eversion bad_replay_version;
3676
u8 decode_redir;
3677
u32 len;
3678
int ret;
3679
int i;
3680
3681
ceph_decode_32_safe(&p, end, len, e_inval);
3682
ceph_decode_need(&p, end, len, e_inval);
3683
p += len; /* skip oid */
3684
3685
ret = ceph_decode_pgid(&p, end, &m->pgid);
3686
if (ret)
3687
return ret;
3688
3689
ceph_decode_64_safe(&p, end, m->flags, e_inval);
3690
ceph_decode_32_safe(&p, end, m->result, e_inval);
3691
ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
3692
memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
3693
p += sizeof(bad_replay_version);
3694
ceph_decode_32_safe(&p, end, m->epoch, e_inval);
3695
3696
ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
3697
if (m->num_ops > ARRAY_SIZE(m->outdata_len))
3698
goto e_inval;
3699
3700
ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
3701
e_inval);
3702
for (i = 0; i < m->num_ops; i++) {
3703
struct ceph_osd_op *op = p;
3704
3705
m->outdata_len[i] = le32_to_cpu(op->payload_len);
3706
p += sizeof(*op);
3707
}
3708
3709
ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
3710
for (i = 0; i < m->num_ops; i++)
3711
ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
3712
3713
if (version >= 5) {
3714
ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
3715
memcpy(&m->replay_version, p, sizeof(m->replay_version));
3716
p += sizeof(m->replay_version);
3717
ceph_decode_64_safe(&p, end, m->user_version, e_inval);
3718
} else {
3719
m->replay_version = bad_replay_version; /* struct */
3720
m->user_version = le64_to_cpu(m->replay_version.version);
3721
}
3722
3723
if (version >= 6) {
3724
if (version >= 7)
3725
ceph_decode_8_safe(&p, end, decode_redir, e_inval);
3726
else
3727
decode_redir = 1;
3728
} else {
3729
decode_redir = 0;
3730
}
3731
3732
if (decode_redir) {
3733
ret = ceph_redirect_decode(&p, end, &m->redirect);
3734
if (ret)
3735
return ret;
3736
} else {
3737
ceph_oloc_init(&m->redirect.oloc);
3738
}
3739
3740
return 0;
3741
3742
e_inval:
3743
return -EINVAL;
3744
}
3745
3746
/*
3747
* Handle MOSDOpReply. Set ->r_result and call the callback if it is
3748
* specified.
3749
*/
3750
static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
3751
{
3752
struct ceph_osd_client *osdc = osd->o_osdc;
3753
struct ceph_osd_request *req;
3754
struct MOSDOpReply m;
3755
u64 tid = le64_to_cpu(msg->hdr.tid);
3756
u32 data_len = 0;
3757
int ret;
3758
int i;
3759
3760
dout("%s msg %p tid %llu\n", __func__, msg, tid);
3761
3762
down_read(&osdc->lock);
3763
if (!osd_registered(osd)) {
3764
dout("%s osd%d unknown\n", __func__, osd->o_osd);
3765
goto out_unlock_osdc;
3766
}
3767
WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
3768
3769
mutex_lock(&osd->lock);
3770
req = lookup_request(&osd->o_requests, tid);
3771
if (!req) {
3772
dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
3773
goto out_unlock_session;
3774
}
3775
3776
m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns;
3777
ret = decode_MOSDOpReply(msg, &m);
3778
m.redirect.oloc.pool_ns = NULL;
3779
if (ret) {
3780
pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
3781
req->r_tid, ret);
3782
ceph_msg_dump(msg);
3783
goto fail_request;
3784
}
3785
dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
3786
__func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
3787
m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
3788
le64_to_cpu(m.replay_version.version), m.user_version);
3789
3790
if (m.retry_attempt >= 0) {
3791
if (m.retry_attempt != req->r_attempts - 1) {
3792
dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
3793
req, req->r_tid, m.retry_attempt,
3794
req->r_attempts - 1);
3795
goto out_unlock_session;
3796
}
3797
} else {
3798
WARN_ON(1); /* MOSDOpReply v4 is assumed */
3799
}
3800
3801
if (!ceph_oloc_empty(&m.redirect.oloc)) {
3802
dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
3803
m.redirect.oloc.pool);
3804
unlink_request(osd, req);
3805
mutex_unlock(&osd->lock);
3806
3807
/*
3808
* Not ceph_oloc_copy() - changing pool_ns is not
3809
* supported.
3810
*/
3811
req->r_t.target_oloc.pool = m.redirect.oloc.pool;
3812
req->r_flags |= CEPH_OSD_FLAG_REDIRECTED |
3813
CEPH_OSD_FLAG_IGNORE_OVERLAY |
3814
CEPH_OSD_FLAG_IGNORE_CACHE;
3815
req->r_tid = 0;
3816
__submit_request(req, false);
3817
goto out_unlock_osdc;
3818
}
3819
3820
if (m.result == -EAGAIN) {
3821
dout("req %p tid %llu EAGAIN\n", req, req->r_tid);
3822
unlink_request(osd, req);
3823
mutex_unlock(&osd->lock);
3824
3825
/*
3826
* The object is missing on the replica or not (yet)
3827
* readable. Clear pgid to force a resend to the primary
3828
* via legacy_change.
3829
*/
3830
req->r_t.pgid.pool = 0;
3831
req->r_t.pgid.seed = 0;
3832
WARN_ON(!req->r_t.used_replica);
3833
req->r_flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3834
CEPH_OSD_FLAG_LOCALIZE_READS);
3835
req->r_tid = 0;
3836
__submit_request(req, false);
3837
goto out_unlock_osdc;
3838
}
3839
3840
if (m.num_ops != req->r_num_ops) {
3841
pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
3842
req->r_num_ops, req->r_tid);
3843
goto fail_request;
3844
}
3845
for (i = 0; i < req->r_num_ops; i++) {
3846
dout(" req %p tid %llu op %d rval %d len %u\n", req,
3847
req->r_tid, i, m.rval[i], m.outdata_len[i]);
3848
req->r_ops[i].rval = m.rval[i];
3849
req->r_ops[i].outdata_len = m.outdata_len[i];
3850
data_len += m.outdata_len[i];
3851
}
3852
if (data_len != le32_to_cpu(msg->hdr.data_len)) {
3853
pr_err("sum of lens %u != %u for tid %llu\n", data_len,
3854
le32_to_cpu(msg->hdr.data_len), req->r_tid);
3855
goto fail_request;
3856
}
3857
dout("%s req %p tid %llu result %d data_len %u\n", __func__,
3858
req, req->r_tid, m.result, data_len);
3859
3860
/*
3861
* Since we only ever request ONDISK, we should only ever get
3862
* one (type of) reply back.
3863
*/
3864
WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK));
3865
req->r_version = m.user_version;
3866
req->r_result = m.result ?: data_len;
3867
finish_request(req);
3868
mutex_unlock(&osd->lock);
3869
up_read(&osdc->lock);
3870
3871
__complete_request(req);
3872
return;
3873
3874
fail_request:
3875
complete_request(req, -EIO);
3876
out_unlock_session:
3877
mutex_unlock(&osd->lock);
3878
out_unlock_osdc:
3879
up_read(&osdc->lock);
3880
}
3881
3882
static void set_pool_was_full(struct ceph_osd_client *osdc)
3883
{
3884
struct rb_node *n;
3885
3886
for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
3887
struct ceph_pg_pool_info *pi =
3888
rb_entry(n, struct ceph_pg_pool_info, node);
3889
3890
pi->was_full = __pool_full(pi);
3891
}
3892
}
3893
3894
static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
3895
{
3896
struct ceph_pg_pool_info *pi;
3897
3898
pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
3899
if (!pi)
3900
return false;
3901
3902
return pi->was_full && !__pool_full(pi);
3903
}
3904
3905
static enum calc_target_result
3906
recalc_linger_target(struct ceph_osd_linger_request *lreq)
3907
{
3908
struct ceph_osd_client *osdc = lreq->osdc;
3909
enum calc_target_result ct_res;
3910
3911
ct_res = calc_target(osdc, &lreq->t, true);
3912
if (ct_res == CALC_TARGET_NEED_RESEND) {
3913
struct ceph_osd *osd;
3914
3915
osd = lookup_create_osd(osdc, lreq->t.osd, true);
3916
if (osd != lreq->osd) {
3917
unlink_linger(lreq->osd, lreq);
3918
link_linger(osd, lreq);
3919
}
3920
}
3921
3922
return ct_res;
3923
}
3924
3925
/*
3926
* Requeue requests whose mapping to an OSD has changed.
3927
*/
3928
static void scan_requests(struct ceph_osd *osd,
3929
bool force_resend,
3930
bool cleared_full,
3931
bool check_pool_cleared_full,
3932
struct rb_root *need_resend,
3933
struct list_head *need_resend_linger)
3934
{
3935
struct ceph_osd_client *osdc = osd->o_osdc;
3936
struct rb_node *n;
3937
bool force_resend_writes;
3938
3939
for (n = rb_first(&osd->o_linger_requests); n; ) {
3940
struct ceph_osd_linger_request *lreq =
3941
rb_entry(n, struct ceph_osd_linger_request, node);
3942
enum calc_target_result ct_res;
3943
3944
n = rb_next(n); /* recalc_linger_target() */
3945
3946
dout("%s lreq %p linger_id %llu\n", __func__, lreq,
3947
lreq->linger_id);
3948
ct_res = recalc_linger_target(lreq);
3949
switch (ct_res) {
3950
case CALC_TARGET_NO_ACTION:
3951
force_resend_writes = cleared_full ||
3952
(check_pool_cleared_full &&
3953
pool_cleared_full(osdc, lreq->t.base_oloc.pool));
3954
if (!force_resend && !force_resend_writes)
3955
break;
3956
3957
fallthrough;
3958
case CALC_TARGET_NEED_RESEND:
3959
cancel_linger_map_check(lreq);
3960
/*
3961
* scan_requests() for the previous epoch(s)
3962
* may have already added it to the list, since
3963
* it's not unlinked here.
3964
*/
3965
if (list_empty(&lreq->scan_item))
3966
list_add_tail(&lreq->scan_item, need_resend_linger);
3967
break;
3968
case CALC_TARGET_POOL_DNE:
3969
list_del_init(&lreq->scan_item);
3970
check_linger_pool_dne(lreq);
3971
break;
3972
}
3973
}
3974
3975
for (n = rb_first(&osd->o_requests); n; ) {
3976
struct ceph_osd_request *req =
3977
rb_entry(n, struct ceph_osd_request, r_node);
3978
enum calc_target_result ct_res;
3979
3980
n = rb_next(n); /* unlink_request(), check_pool_dne() */
3981
3982
dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
3983
ct_res = calc_target(osdc, &req->r_t, false);
3984
switch (ct_res) {
3985
case CALC_TARGET_NO_ACTION:
3986
force_resend_writes = cleared_full ||
3987
(check_pool_cleared_full &&
3988
pool_cleared_full(osdc, req->r_t.base_oloc.pool));
3989
if (!force_resend &&
3990
(!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
3991
!force_resend_writes))
3992
break;
3993
3994
fallthrough;
3995
case CALC_TARGET_NEED_RESEND:
3996
cancel_map_check(req);
3997
unlink_request(osd, req);
3998
insert_request(need_resend, req);
3999
break;
4000
case CALC_TARGET_POOL_DNE:
4001
check_pool_dne(req);
4002
break;
4003
}
4004
}
4005
}
4006
4007
static int handle_one_map(struct ceph_osd_client *osdc,
4008
void *p, void *end, bool incremental,
4009
struct rb_root *need_resend,
4010
struct list_head *need_resend_linger)
4011
{
4012
struct ceph_osdmap *newmap;
4013
struct rb_node *n;
4014
bool skipped_map = false;
4015
bool was_full;
4016
4017
was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
4018
set_pool_was_full(osdc);
4019
4020
if (incremental)
4021
newmap = osdmap_apply_incremental(&p, end,
4022
ceph_msgr2(osdc->client),
4023
osdc->osdmap);
4024
else
4025
newmap = ceph_osdmap_decode(&p, end, ceph_msgr2(osdc->client));
4026
if (IS_ERR(newmap))
4027
return PTR_ERR(newmap);
4028
4029
if (newmap != osdc->osdmap) {
4030
/*
4031
* Preserve ->was_full before destroying the old map.
4032
* For pools that weren't in the old map, ->was_full
4033
* should be false.
4034
*/
4035
for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
4036
struct ceph_pg_pool_info *pi =
4037
rb_entry(n, struct ceph_pg_pool_info, node);
4038
struct ceph_pg_pool_info *old_pi;
4039
4040
old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
4041
if (old_pi)
4042
pi->was_full = old_pi->was_full;
4043
else
4044
WARN_ON(pi->was_full);
4045
}
4046
4047
if (osdc->osdmap->epoch &&
4048
osdc->osdmap->epoch + 1 < newmap->epoch) {
4049
WARN_ON(incremental);
4050
skipped_map = true;
4051
}
4052
4053
ceph_osdmap_destroy(osdc->osdmap);
4054
osdc->osdmap = newmap;
4055
}
4056
4057
was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
4058
scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
4059
need_resend, need_resend_linger);
4060
4061
for (n = rb_first(&osdc->osds); n; ) {
4062
struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
4063
4064
n = rb_next(n); /* close_osd() */
4065
4066
scan_requests(osd, skipped_map, was_full, true, need_resend,
4067
need_resend_linger);
4068
if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
4069
memcmp(&osd->o_con.peer_addr,
4070
ceph_osd_addr(osdc->osdmap, osd->o_osd),
4071
sizeof(struct ceph_entity_addr)))
4072
close_osd(osd);
4073
}
4074
4075
return 0;
4076
}
4077
4078
static void kick_requests(struct ceph_osd_client *osdc,
4079
struct rb_root *need_resend,
4080
struct list_head *need_resend_linger)
4081
{
4082
struct ceph_osd_linger_request *lreq, *nlreq;
4083
enum calc_target_result ct_res;
4084
struct rb_node *n;
4085
4086
/* make sure need_resend targets reflect latest map */
4087
for (n = rb_first(need_resend); n; ) {
4088
struct ceph_osd_request *req =
4089
rb_entry(n, struct ceph_osd_request, r_node);
4090
4091
n = rb_next(n);
4092
4093
if (req->r_t.epoch < osdc->osdmap->epoch) {
4094
ct_res = calc_target(osdc, &req->r_t, false);
4095
if (ct_res == CALC_TARGET_POOL_DNE) {
4096
erase_request(need_resend, req);
4097
check_pool_dne(req);
4098
}
4099
}
4100
}
4101
4102
for (n = rb_first(need_resend); n; ) {
4103
struct ceph_osd_request *req =
4104
rb_entry(n, struct ceph_osd_request, r_node);
4105
struct ceph_osd *osd;
4106
4107
n = rb_next(n);
4108
erase_request(need_resend, req); /* before link_request() */
4109
4110
osd = lookup_create_osd(osdc, req->r_t.osd, true);
4111
link_request(osd, req);
4112
if (!req->r_linger) {
4113
if (!osd_homeless(osd) && !req->r_t.paused)
4114
send_request(req);
4115
} else {
4116
cancel_linger_request(req);
4117
}
4118
}
4119
4120
list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) {
4121
if (!osd_homeless(lreq->osd))
4122
send_linger(lreq);
4123
4124
list_del_init(&lreq->scan_item);
4125
}
4126
}
4127
4128
/*
4129
* Process updated osd map.
4130
*
4131
* The message contains any number of incremental and full maps, normally
4132
* indicating some sort of topology change in the cluster. Kick requests
4133
* off to different OSDs as needed.
4134
*/
4135
void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
4136
{
4137
void *p = msg->front.iov_base;
4138
void *const end = p + msg->front.iov_len;
4139
u32 nr_maps, maplen;
4140
u32 epoch;
4141
struct ceph_fsid fsid;
4142
struct rb_root need_resend = RB_ROOT;
4143
LIST_HEAD(need_resend_linger);
4144
bool handled_incremental = false;
4145
bool was_pauserd, was_pausewr;
4146
bool pauserd, pausewr;
4147
int err;
4148
4149
dout("%s have %u\n", __func__, osdc->osdmap->epoch);
4150
down_write(&osdc->lock);
4151
4152
/* verify fsid */
4153
ceph_decode_need(&p, end, sizeof(fsid), bad);
4154
ceph_decode_copy(&p, &fsid, sizeof(fsid));
4155
if (ceph_check_fsid(osdc->client, &fsid) < 0)
4156
goto bad;
4157
4158
was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
4159
was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
4160
ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
4161
have_pool_full(osdc);
4162
4163
/* incremental maps */
4164
ceph_decode_32_safe(&p, end, nr_maps, bad);
4165
dout(" %d inc maps\n", nr_maps);
4166
while (nr_maps > 0) {
4167
ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4168
epoch = ceph_decode_32(&p);
4169
maplen = ceph_decode_32(&p);
4170
ceph_decode_need(&p, end, maplen, bad);
4171
if (osdc->osdmap->epoch &&
4172
osdc->osdmap->epoch + 1 == epoch) {
4173
dout("applying incremental map %u len %d\n",
4174
epoch, maplen);
4175
err = handle_one_map(osdc, p, p + maplen, true,
4176
&need_resend, &need_resend_linger);
4177
if (err)
4178
goto bad;
4179
handled_incremental = true;
4180
} else {
4181
dout("ignoring incremental map %u len %d\n",
4182
epoch, maplen);
4183
}
4184
p += maplen;
4185
nr_maps--;
4186
}
4187
if (handled_incremental)
4188
goto done;
4189
4190
/* full maps */
4191
ceph_decode_32_safe(&p, end, nr_maps, bad);
4192
dout(" %d full maps\n", nr_maps);
4193
while (nr_maps) {
4194
ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4195
epoch = ceph_decode_32(&p);
4196
maplen = ceph_decode_32(&p);
4197
ceph_decode_need(&p, end, maplen, bad);
4198
if (nr_maps > 1) {
4199
dout("skipping non-latest full map %u len %d\n",
4200
epoch, maplen);
4201
} else if (osdc->osdmap->epoch >= epoch) {
4202
dout("skipping full map %u len %d, "
4203
"older than our %u\n", epoch, maplen,
4204
osdc->osdmap->epoch);
4205
} else {
4206
dout("taking full map %u len %d\n", epoch, maplen);
4207
err = handle_one_map(osdc, p, p + maplen, false,
4208
&need_resend, &need_resend_linger);
4209
if (err)
4210
goto bad;
4211
}
4212
p += maplen;
4213
nr_maps--;
4214
}
4215
4216
done:
4217
/*
4218
* subscribe to subsequent osdmap updates if full to ensure
4219
* we find out when we are no longer full and stop returning
4220
* ENOSPC.
4221
*/
4222
pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
4223
pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
4224
ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
4225
have_pool_full(osdc);
4226
if (was_pauserd || was_pausewr || pauserd || pausewr ||
4227
osdc->osdmap->epoch < osdc->epoch_barrier)
4228
maybe_request_map(osdc);
4229
4230
kick_requests(osdc, &need_resend, &need_resend_linger);
4231
4232
ceph_osdc_abort_on_full(osdc);
4233
ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
4234
osdc->osdmap->epoch);
4235
up_write(&osdc->lock);
4236
wake_up_all(&osdc->client->auth_wq);
4237
return;
4238
4239
bad:
4240
pr_err("osdc handle_map corrupt msg\n");
4241
ceph_msg_dump(msg);
4242
up_write(&osdc->lock);
4243
}
4244
4245
/*
4246
* Resubmit requests pending on the given osd.
4247
*/
4248
static void kick_osd_requests(struct ceph_osd *osd)
4249
{
4250
struct rb_node *n;
4251
4252
clear_backoffs(osd);
4253
4254
for (n = rb_first(&osd->o_requests); n; ) {
4255
struct ceph_osd_request *req =
4256
rb_entry(n, struct ceph_osd_request, r_node);
4257
4258
n = rb_next(n); /* cancel_linger_request() */
4259
4260
if (!req->r_linger) {
4261
if (!req->r_t.paused)
4262
send_request(req);
4263
} else {
4264
cancel_linger_request(req);
4265
}
4266
}
4267
for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
4268
struct ceph_osd_linger_request *lreq =
4269
rb_entry(n, struct ceph_osd_linger_request, node);
4270
4271
send_linger(lreq);
4272
}
4273
}
4274
4275
/*
4276
* If the osd connection drops, we need to resubmit all requests.
4277
*/
4278
static void osd_fault(struct ceph_connection *con)
4279
{
4280
struct ceph_osd *osd = con->private;
4281
struct ceph_osd_client *osdc = osd->o_osdc;
4282
4283
dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
4284
4285
down_write(&osdc->lock);
4286
if (!osd_registered(osd)) {
4287
dout("%s osd%d unknown\n", __func__, osd->o_osd);
4288
goto out_unlock;
4289
}
4290
4291
osd->o_sparse_op_idx = -1;
4292
ceph_init_sparse_read(&osd->o_sparse_read);
4293
4294
if (!reopen_osd(osd))
4295
kick_osd_requests(osd);
4296
maybe_request_map(osdc);
4297
4298
out_unlock:
4299
up_write(&osdc->lock);
4300
}
4301
4302
struct MOSDBackoff {
4303
struct ceph_spg spgid;
4304
u32 map_epoch;
4305
u8 op;
4306
u64 id;
4307
struct ceph_hobject_id *begin;
4308
struct ceph_hobject_id *end;
4309
};
4310
4311
static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m)
4312
{
4313
void *p = msg->front.iov_base;
4314
void *const end = p + msg->front.iov_len;
4315
u8 struct_v;
4316
u32 struct_len;
4317
int ret;
4318
4319
ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len);
4320
if (ret)
4321
return ret;
4322
4323
ret = ceph_decode_pgid(&p, end, &m->spgid.pgid);
4324
if (ret)
4325
return ret;
4326
4327
ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval);
4328
ceph_decode_32_safe(&p, end, m->map_epoch, e_inval);
4329
ceph_decode_8_safe(&p, end, m->op, e_inval);
4330
ceph_decode_64_safe(&p, end, m->id, e_inval);
4331
4332
m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO);
4333
if (!m->begin)
4334
return -ENOMEM;
4335
4336
ret = decode_hoid(&p, end, m->begin);
4337
if (ret) {
4338
free_hoid(m->begin);
4339
return ret;
4340
}
4341
4342
m->end = kzalloc(sizeof(*m->end), GFP_NOIO);
4343
if (!m->end) {
4344
free_hoid(m->begin);
4345
return -ENOMEM;
4346
}
4347
4348
ret = decode_hoid(&p, end, m->end);
4349
if (ret) {
4350
free_hoid(m->begin);
4351
free_hoid(m->end);
4352
return ret;
4353
}
4354
4355
return 0;
4356
4357
e_inval:
4358
return -EINVAL;
4359
}
4360
4361
static struct ceph_msg *create_backoff_message(
4362
const struct ceph_osd_backoff *backoff,
4363
u32 map_epoch)
4364
{
4365
struct ceph_msg *msg;
4366
void *p, *end;
4367
int msg_size;
4368
4369
msg_size = CEPH_ENCODING_START_BLK_LEN +
4370
CEPH_PGID_ENCODING_LEN + 1; /* spgid */
4371
msg_size += 4 + 1 + 8; /* map_epoch, op, id */
4372
msg_size += CEPH_ENCODING_START_BLK_LEN +
4373
hoid_encoding_size(backoff->begin);
4374
msg_size += CEPH_ENCODING_START_BLK_LEN +
4375
hoid_encoding_size(backoff->end);
4376
4377
msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true);
4378
if (!msg)
4379
return NULL;
4380
4381
p = msg->front.iov_base;
4382
end = p + msg->front_alloc_len;
4383
4384
encode_spgid(&p, &backoff->spgid);
4385
ceph_encode_32(&p, map_epoch);
4386
ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK);
4387
ceph_encode_64(&p, backoff->id);
4388
encode_hoid(&p, end, backoff->begin);
4389
encode_hoid(&p, end, backoff->end);
4390
BUG_ON(p != end);
4391
4392
msg->front.iov_len = p - msg->front.iov_base;
4393
msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */
4394
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
4395
4396
return msg;
4397
}
4398
4399
static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m)
4400
{
4401
struct ceph_spg_mapping *spg;
4402
struct ceph_osd_backoff *backoff;
4403
struct ceph_msg *msg;
4404
4405
dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4406
m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4407
4408
spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid);
4409
if (!spg) {
4410
spg = alloc_spg_mapping();
4411
if (!spg) {
4412
pr_err("%s failed to allocate spg\n", __func__);
4413
return;
4414
}
4415
spg->spgid = m->spgid; /* struct */
4416
insert_spg_mapping(&osd->o_backoff_mappings, spg);
4417
}
4418
4419
backoff = alloc_backoff();
4420
if (!backoff) {
4421
pr_err("%s failed to allocate backoff\n", __func__);
4422
return;
4423
}
4424
backoff->spgid = m->spgid; /* struct */
4425
backoff->id = m->id;
4426
backoff->begin = m->begin;
4427
m->begin = NULL; /* backoff now owns this */
4428
backoff->end = m->end;
4429
m->end = NULL; /* ditto */
4430
4431
insert_backoff(&spg->backoffs, backoff);
4432
insert_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4433
4434
/*
4435
* Ack with original backoff's epoch so that the OSD can
4436
* discard this if there was a PG split.
4437
*/
4438
msg = create_backoff_message(backoff, m->map_epoch);
4439
if (!msg) {
4440
pr_err("%s failed to allocate msg\n", __func__);
4441
return;
4442
}
4443
ceph_con_send(&osd->o_con, msg);
4444
}
4445
4446
static bool target_contained_by(const struct ceph_osd_request_target *t,
4447
const struct ceph_hobject_id *begin,
4448
const struct ceph_hobject_id *end)
4449
{
4450
struct ceph_hobject_id hoid;
4451
int cmp;
4452
4453
hoid_fill_from_target(&hoid, t);
4454
cmp = hoid_compare(&hoid, begin);
4455
return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0);
4456
}
4457
4458
static void handle_backoff_unblock(struct ceph_osd *osd,
4459
const struct MOSDBackoff *m)
4460
{
4461
struct ceph_spg_mapping *spg;
4462
struct ceph_osd_backoff *backoff;
4463
struct rb_node *n;
4464
4465
dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4466
m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4467
4468
backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id);
4469
if (!backoff) {
4470
pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n",
4471
__func__, osd->o_osd, m->spgid.pgid.pool,
4472
m->spgid.pgid.seed, m->spgid.shard, m->id);
4473
return;
4474
}
4475
4476
if (hoid_compare(backoff->begin, m->begin) &&
4477
hoid_compare(backoff->end, m->end)) {
4478
pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n",
4479
__func__, osd->o_osd, m->spgid.pgid.pool,
4480
m->spgid.pgid.seed, m->spgid.shard, m->id);
4481
/* unblock it anyway... */
4482
}
4483
4484
spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid);
4485
BUG_ON(!spg);
4486
4487
erase_backoff(&spg->backoffs, backoff);
4488
erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4489
free_backoff(backoff);
4490
4491
if (RB_EMPTY_ROOT(&spg->backoffs)) {
4492
erase_spg_mapping(&osd->o_backoff_mappings, spg);
4493
free_spg_mapping(spg);
4494
}
4495
4496
for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
4497
struct ceph_osd_request *req =
4498
rb_entry(n, struct ceph_osd_request, r_node);
4499
4500
if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) {
4501
/*
4502
* Match against @m, not @backoff -- the PG may
4503
* have split on the OSD.
4504
*/
4505
if (target_contained_by(&req->r_t, m->begin, m->end)) {
4506
/*
4507
* If no other installed backoff applies,
4508
* resend.
4509
*/
4510
send_request(req);
4511
}
4512
}
4513
}
4514
}
4515
4516
static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg)
4517
{
4518
struct ceph_osd_client *osdc = osd->o_osdc;
4519
struct MOSDBackoff m;
4520
int ret;
4521
4522
down_read(&osdc->lock);
4523
if (!osd_registered(osd)) {
4524
dout("%s osd%d unknown\n", __func__, osd->o_osd);
4525
up_read(&osdc->lock);
4526
return;
4527
}
4528
WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
4529
4530
mutex_lock(&osd->lock);
4531
ret = decode_MOSDBackoff(msg, &m);
4532
if (ret) {
4533
pr_err("failed to decode MOSDBackoff: %d\n", ret);
4534
ceph_msg_dump(msg);
4535
goto out_unlock;
4536
}
4537
4538
switch (m.op) {
4539
case CEPH_OSD_BACKOFF_OP_BLOCK:
4540
handle_backoff_block(osd, &m);
4541
break;
4542
case CEPH_OSD_BACKOFF_OP_UNBLOCK:
4543
handle_backoff_unblock(osd, &m);
4544
break;
4545
default:
4546
pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op);
4547
}
4548
4549
free_hoid(m.begin);
4550
free_hoid(m.end);
4551
4552
out_unlock:
4553
mutex_unlock(&osd->lock);
4554
up_read(&osdc->lock);
4555
}
4556
4557
/*
4558
* Process osd watch notifications
4559
*/
4560
static void handle_watch_notify(struct ceph_osd_client *osdc,
4561
struct ceph_msg *msg)
4562
{
4563
void *p = msg->front.iov_base;
4564
void *const end = p + msg->front.iov_len;
4565
struct ceph_osd_linger_request *lreq;
4566
struct linger_work *lwork;
4567
u8 proto_ver, opcode;
4568
u64 cookie, notify_id;
4569
u64 notifier_id = 0;
4570
s32 return_code = 0;
4571
void *payload = NULL;
4572
u32 payload_len = 0;
4573
4574
ceph_decode_8_safe(&p, end, proto_ver, bad);
4575
ceph_decode_8_safe(&p, end, opcode, bad);
4576
ceph_decode_64_safe(&p, end, cookie, bad);
4577
p += 8; /* skip ver */
4578
ceph_decode_64_safe(&p, end, notify_id, bad);
4579
4580
if (proto_ver >= 1) {
4581
ceph_decode_32_safe(&p, end, payload_len, bad);
4582
ceph_decode_need(&p, end, payload_len, bad);
4583
payload = p;
4584
p += payload_len;
4585
}
4586
4587
if (le16_to_cpu(msg->hdr.version) >= 2)
4588
ceph_decode_32_safe(&p, end, return_code, bad);
4589
4590
if (le16_to_cpu(msg->hdr.version) >= 3)
4591
ceph_decode_64_safe(&p, end, notifier_id, bad);
4592
4593
down_read(&osdc->lock);
4594
lreq = lookup_linger_osdc(&osdc->linger_requests, cookie);
4595
if (!lreq) {
4596
dout("%s opcode %d cookie %llu dne\n", __func__, opcode,
4597
cookie);
4598
goto out_unlock_osdc;
4599
}
4600
4601
mutex_lock(&lreq->lock);
4602
dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__,
4603
opcode, cookie, lreq, lreq->is_watch);
4604
if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
4605
if (!lreq->last_error) {
4606
lreq->last_error = -ENOTCONN;
4607
queue_watch_error(lreq);
4608
}
4609
} else if (!lreq->is_watch) {
4610
/* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */
4611
if (lreq->notify_id && lreq->notify_id != notify_id) {
4612
dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq,
4613
lreq->notify_id, notify_id);
4614
} else if (!completion_done(&lreq->notify_finish_wait)) {
4615
struct ceph_msg_data *data =
4616
msg->num_data_items ? &msg->data[0] : NULL;
4617
4618
if (data) {
4619
if (lreq->preply_pages) {
4620
WARN_ON(data->type !=
4621
CEPH_MSG_DATA_PAGES);
4622
*lreq->preply_pages = data->pages;
4623
*lreq->preply_len = data->length;
4624
data->own_pages = false;
4625
}
4626
}
4627
lreq->notify_finish_error = return_code;
4628
complete_all(&lreq->notify_finish_wait);
4629
}
4630
} else {
4631
/* CEPH_WATCH_EVENT_NOTIFY */
4632
lwork = lwork_alloc(lreq, do_watch_notify);
4633
if (!lwork) {
4634
pr_err("failed to allocate notify-lwork\n");
4635
goto out_unlock_lreq;
4636
}
4637
4638
lwork->notify.notify_id = notify_id;
4639
lwork->notify.notifier_id = notifier_id;
4640
lwork->notify.payload = payload;
4641
lwork->notify.payload_len = payload_len;
4642
lwork->notify.msg = ceph_msg_get(msg);
4643
lwork_queue(lwork);
4644
}
4645
4646
out_unlock_lreq:
4647
mutex_unlock(&lreq->lock);
4648
out_unlock_osdc:
4649
up_read(&osdc->lock);
4650
return;
4651
4652
bad:
4653
pr_err("osdc handle_watch_notify corrupt msg\n");
4654
}
4655
4656
/*
4657
* Register request, send initial attempt.
4658
*/
4659
void ceph_osdc_start_request(struct ceph_osd_client *osdc,
4660
struct ceph_osd_request *req)
4661
{
4662
down_read(&osdc->lock);
4663
submit_request(req, false);
4664
up_read(&osdc->lock);
4665
}
4666
EXPORT_SYMBOL(ceph_osdc_start_request);
4667
4668
/*
4669
* Unregister request. If @req was registered, it isn't completed:
4670
* r_result isn't set and __complete_request() isn't invoked.
4671
*
4672
* If @req wasn't registered, this call may have raced with
4673
* handle_reply(), in which case r_result would already be set and
4674
* __complete_request() would be getting invoked, possibly even
4675
* concurrently with this call.
4676
*/
4677
void ceph_osdc_cancel_request(struct ceph_osd_request *req)
4678
{
4679
struct ceph_osd_client *osdc = req->r_osdc;
4680
4681
down_write(&osdc->lock);
4682
if (req->r_osd)
4683
cancel_request(req);
4684
up_write(&osdc->lock);
4685
}
4686
EXPORT_SYMBOL(ceph_osdc_cancel_request);
4687
4688
/*
4689
* @timeout: in jiffies, 0 means "wait forever"
4690
*/
4691
static int wait_request_timeout(struct ceph_osd_request *req,
4692
unsigned long timeout)
4693
{
4694
long left;
4695
4696
dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
4697
left = wait_for_completion_killable_timeout(&req->r_completion,
4698
ceph_timeout_jiffies(timeout));
4699
if (left <= 0) {
4700
left = left ?: -ETIMEDOUT;
4701
ceph_osdc_cancel_request(req);
4702
} else {
4703
left = req->r_result; /* completed */
4704
}
4705
4706
return left;
4707
}
4708
4709
/*
4710
* wait for a request to complete
4711
*/
4712
int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
4713
struct ceph_osd_request *req)
4714
{
4715
return wait_request_timeout(req, 0);
4716
}
4717
EXPORT_SYMBOL(ceph_osdc_wait_request);
4718
4719
/*
4720
* sync - wait for all in-flight requests to flush. avoid starvation.
4721
*/
4722
void ceph_osdc_sync(struct ceph_osd_client *osdc)
4723
{
4724
struct rb_node *n, *p;
4725
u64 last_tid = atomic64_read(&osdc->last_tid);
4726
4727
again:
4728
down_read(&osdc->lock);
4729
for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
4730
struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
4731
4732
mutex_lock(&osd->lock);
4733
for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
4734
struct ceph_osd_request *req =
4735
rb_entry(p, struct ceph_osd_request, r_node);
4736
4737
if (req->r_tid > last_tid)
4738
break;
4739
4740
if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
4741
continue;
4742
4743
ceph_osdc_get_request(req);
4744
mutex_unlock(&osd->lock);
4745
up_read(&osdc->lock);
4746
dout("%s waiting on req %p tid %llu last_tid %llu\n",
4747
__func__, req, req->r_tid, last_tid);
4748
wait_for_completion(&req->r_completion);
4749
ceph_osdc_put_request(req);
4750
goto again;
4751
}
4752
4753
mutex_unlock(&osd->lock);
4754
}
4755
4756
up_read(&osdc->lock);
4757
dout("%s done last_tid %llu\n", __func__, last_tid);
4758
}
4759
EXPORT_SYMBOL(ceph_osdc_sync);
4760
4761
/*
4762
* Returns a handle, caller owns a ref.
4763
*/
4764
struct ceph_osd_linger_request *
4765
ceph_osdc_watch(struct ceph_osd_client *osdc,
4766
struct ceph_object_id *oid,
4767
struct ceph_object_locator *oloc,
4768
rados_watchcb2_t wcb,
4769
rados_watcherrcb_t errcb,
4770
void *data)
4771
{
4772
struct ceph_osd_linger_request *lreq;
4773
int ret;
4774
4775
lreq = linger_alloc(osdc);
4776
if (!lreq)
4777
return ERR_PTR(-ENOMEM);
4778
4779
lreq->is_watch = true;
4780
lreq->wcb = wcb;
4781
lreq->errcb = errcb;
4782
lreq->data = data;
4783
lreq->watch_valid_thru = jiffies;
4784
4785
ceph_oid_copy(&lreq->t.base_oid, oid);
4786
ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4787
lreq->t.flags = CEPH_OSD_FLAG_WRITE;
4788
ktime_get_real_ts64(&lreq->mtime);
4789
4790
linger_submit(lreq);
4791
ret = linger_reg_commit_wait(lreq);
4792
if (ret) {
4793
linger_cancel(lreq);
4794
goto err_put_lreq;
4795
}
4796
4797
return lreq;
4798
4799
err_put_lreq:
4800
linger_put(lreq);
4801
return ERR_PTR(ret);
4802
}
4803
EXPORT_SYMBOL(ceph_osdc_watch);
4804
4805
/*
4806
* Releases a ref.
4807
*
4808
* Times out after mount_timeout to preserve rbd unmap behaviour
4809
* introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap
4810
* with mount_timeout").
4811
*/
4812
int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
4813
struct ceph_osd_linger_request *lreq)
4814
{
4815
struct ceph_options *opts = osdc->client->options;
4816
struct ceph_osd_request *req;
4817
int ret;
4818
4819
req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4820
if (!req)
4821
return -ENOMEM;
4822
4823
ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
4824
ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
4825
req->r_flags = CEPH_OSD_FLAG_WRITE;
4826
ktime_get_real_ts64(&req->r_mtime);
4827
osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_UNWATCH,
4828
lreq->linger_id, 0);
4829
4830
ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4831
if (ret)
4832
goto out_put_req;
4833
4834
ceph_osdc_start_request(osdc, req);
4835
linger_cancel(lreq);
4836
linger_put(lreq);
4837
ret = wait_request_timeout(req, opts->mount_timeout);
4838
4839
out_put_req:
4840
ceph_osdc_put_request(req);
4841
return ret;
4842
}
4843
EXPORT_SYMBOL(ceph_osdc_unwatch);
4844
4845
static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
4846
u64 notify_id, u64 cookie, void *payload,
4847
u32 payload_len)
4848
{
4849
struct ceph_osd_req_op *op;
4850
struct ceph_pagelist *pl;
4851
int ret;
4852
4853
op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
4854
4855
pl = ceph_pagelist_alloc(GFP_NOIO);
4856
if (!pl)
4857
return -ENOMEM;
4858
4859
ret = ceph_pagelist_encode_64(pl, notify_id);
4860
ret |= ceph_pagelist_encode_64(pl, cookie);
4861
if (payload) {
4862
ret |= ceph_pagelist_encode_32(pl, payload_len);
4863
ret |= ceph_pagelist_append(pl, payload, payload_len);
4864
} else {
4865
ret |= ceph_pagelist_encode_32(pl, 0);
4866
}
4867
if (ret) {
4868
ceph_pagelist_release(pl);
4869
return -ENOMEM;
4870
}
4871
4872
ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl);
4873
op->indata_len = pl->length;
4874
return 0;
4875
}
4876
4877
int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
4878
struct ceph_object_id *oid,
4879
struct ceph_object_locator *oloc,
4880
u64 notify_id,
4881
u64 cookie,
4882
void *payload,
4883
u32 payload_len)
4884
{
4885
struct ceph_osd_request *req;
4886
int ret;
4887
4888
req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4889
if (!req)
4890
return -ENOMEM;
4891
4892
ceph_oid_copy(&req->r_base_oid, oid);
4893
ceph_oloc_copy(&req->r_base_oloc, oloc);
4894
req->r_flags = CEPH_OSD_FLAG_READ;
4895
4896
ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
4897
payload_len);
4898
if (ret)
4899
goto out_put_req;
4900
4901
ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4902
if (ret)
4903
goto out_put_req;
4904
4905
ceph_osdc_start_request(osdc, req);
4906
ret = ceph_osdc_wait_request(osdc, req);
4907
4908
out_put_req:
4909
ceph_osdc_put_request(req);
4910
return ret;
4911
}
4912
EXPORT_SYMBOL(ceph_osdc_notify_ack);
4913
4914
/*
4915
* @timeout: in seconds
4916
*
4917
* @preply_{pages,len} are initialized both on success and error.
4918
* The caller is responsible for:
4919
*
4920
* ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
4921
*/
4922
int ceph_osdc_notify(struct ceph_osd_client *osdc,
4923
struct ceph_object_id *oid,
4924
struct ceph_object_locator *oloc,
4925
void *payload,
4926
u32 payload_len,
4927
u32 timeout,
4928
struct page ***preply_pages,
4929
size_t *preply_len)
4930
{
4931
struct ceph_osd_linger_request *lreq;
4932
int ret;
4933
4934
WARN_ON(!timeout);
4935
if (preply_pages) {
4936
*preply_pages = NULL;
4937
*preply_len = 0;
4938
}
4939
4940
lreq = linger_alloc(osdc);
4941
if (!lreq)
4942
return -ENOMEM;
4943
4944
lreq->request_pl = ceph_pagelist_alloc(GFP_NOIO);
4945
if (!lreq->request_pl) {
4946
ret = -ENOMEM;
4947
goto out_put_lreq;
4948
}
4949
4950
ret = ceph_pagelist_encode_32(lreq->request_pl, 1); /* prot_ver */
4951
ret |= ceph_pagelist_encode_32(lreq->request_pl, timeout);
4952
ret |= ceph_pagelist_encode_32(lreq->request_pl, payload_len);
4953
ret |= ceph_pagelist_append(lreq->request_pl, payload, payload_len);
4954
if (ret) {
4955
ret = -ENOMEM;
4956
goto out_put_lreq;
4957
}
4958
4959
/* for notify_id */
4960
lreq->notify_id_pages = ceph_alloc_page_vector(1, GFP_NOIO);
4961
if (IS_ERR(lreq->notify_id_pages)) {
4962
ret = PTR_ERR(lreq->notify_id_pages);
4963
lreq->notify_id_pages = NULL;
4964
goto out_put_lreq;
4965
}
4966
4967
lreq->preply_pages = preply_pages;
4968
lreq->preply_len = preply_len;
4969
4970
ceph_oid_copy(&lreq->t.base_oid, oid);
4971
ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4972
lreq->t.flags = CEPH_OSD_FLAG_READ;
4973
4974
linger_submit(lreq);
4975
ret = linger_reg_commit_wait(lreq);
4976
if (!ret)
4977
ret = linger_notify_finish_wait(lreq,
4978
msecs_to_jiffies(2 * timeout * MSEC_PER_SEC));
4979
else
4980
dout("lreq %p failed to initiate notify %d\n", lreq, ret);
4981
4982
linger_cancel(lreq);
4983
out_put_lreq:
4984
linger_put(lreq);
4985
return ret;
4986
}
4987
EXPORT_SYMBOL(ceph_osdc_notify);
4988
4989
static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
4990
{
4991
u8 struct_v;
4992
u32 struct_len;
4993
int ret;
4994
4995
ret = ceph_start_decoding(p, end, 2, "watch_item_t",
4996
&struct_v, &struct_len);
4997
if (ret)
4998
goto bad;
4999
5000
ret = -EINVAL;
5001
ceph_decode_copy_safe(p, end, &item->name, sizeof(item->name), bad);
5002
ceph_decode_64_safe(p, end, item->cookie, bad);
5003
ceph_decode_skip_32(p, end, bad); /* skip timeout seconds */
5004
5005
if (struct_v >= 2) {
5006
ret = ceph_decode_entity_addr(p, end, &item->addr);
5007
if (ret)
5008
goto bad;
5009
} else {
5010
ret = 0;
5011
}
5012
5013
dout("%s %s%llu cookie %llu addr %s\n", __func__,
5014
ENTITY_NAME(item->name), item->cookie,
5015
ceph_pr_addr(&item->addr));
5016
bad:
5017
return ret;
5018
}
5019
5020
static int decode_watchers(void **p, void *end,
5021
struct ceph_watch_item **watchers,
5022
u32 *num_watchers)
5023
{
5024
u8 struct_v;
5025
u32 struct_len;
5026
int i;
5027
int ret;
5028
5029
ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
5030
&struct_v, &struct_len);
5031
if (ret)
5032
return ret;
5033
5034
*num_watchers = ceph_decode_32(p);
5035
*watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
5036
if (!*watchers)
5037
return -ENOMEM;
5038
5039
for (i = 0; i < *num_watchers; i++) {
5040
ret = decode_watcher(p, end, *watchers + i);
5041
if (ret) {
5042
kfree(*watchers);
5043
return ret;
5044
}
5045
}
5046
5047
return 0;
5048
}
5049
5050
/*
5051
* On success, the caller is responsible for:
5052
*
5053
* kfree(watchers);
5054
*/
5055
int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
5056
struct ceph_object_id *oid,
5057
struct ceph_object_locator *oloc,
5058
struct ceph_watch_item **watchers,
5059
u32 *num_watchers)
5060
{
5061
struct ceph_osd_request *req;
5062
struct page **pages;
5063
int ret;
5064
5065
req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
5066
if (!req)
5067
return -ENOMEM;
5068
5069
ceph_oid_copy(&req->r_base_oid, oid);
5070
ceph_oloc_copy(&req->r_base_oloc, oloc);
5071
req->r_flags = CEPH_OSD_FLAG_READ;
5072
5073
pages = ceph_alloc_page_vector(1, GFP_NOIO);
5074
if (IS_ERR(pages)) {
5075
ret = PTR_ERR(pages);
5076
goto out_put_req;
5077
}
5078
5079
osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
5080
ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
5081
response_data),
5082
pages, PAGE_SIZE, 0, false, true);
5083
5084
ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
5085
if (ret)
5086
goto out_put_req;
5087
5088
ceph_osdc_start_request(osdc, req);
5089
ret = ceph_osdc_wait_request(osdc, req);
5090
if (ret >= 0) {
5091
void *p = page_address(pages[0]);
5092
void *const end = p + req->r_ops[0].outdata_len;
5093
5094
ret = decode_watchers(&p, end, watchers, num_watchers);
5095
}
5096
5097
out_put_req:
5098
ceph_osdc_put_request(req);
5099
return ret;
5100
}
5101
EXPORT_SYMBOL(ceph_osdc_list_watchers);
5102
5103
/*
5104
* Call all pending notify callbacks - for use after a watch is
5105
* unregistered, to make sure no more callbacks for it will be invoked
5106
*/
5107
void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
5108
{
5109
dout("%s osdc %p\n", __func__, osdc);
5110
flush_workqueue(osdc->notify_wq);
5111
}
5112
EXPORT_SYMBOL(ceph_osdc_flush_notifies);
5113
5114
void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
5115
{
5116
down_read(&osdc->lock);
5117
maybe_request_map(osdc);
5118
up_read(&osdc->lock);
5119
}
5120
EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
5121
5122
/*
5123
* Execute an OSD class method on an object.
5124
*
5125
* @flags: CEPH_OSD_FLAG_*
5126
* @resp_len: in/out param for reply length
5127
*/
5128
int ceph_osdc_call(struct ceph_osd_client *osdc,
5129
struct ceph_object_id *oid,
5130
struct ceph_object_locator *oloc,
5131
const char *class, const char *method,
5132
unsigned int flags,
5133
struct page *req_page, size_t req_len,
5134
struct page **resp_pages, size_t *resp_len)
5135
{
5136
struct ceph_osd_request *req;
5137
int ret;
5138
5139
if (req_len > PAGE_SIZE)
5140
return -E2BIG;
5141
5142
req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
5143
if (!req)
5144
return -ENOMEM;
5145
5146
ceph_oid_copy(&req->r_base_oid, oid);
5147
ceph_oloc_copy(&req->r_base_oloc, oloc);
5148
req->r_flags = flags;
5149
5150
ret = osd_req_op_cls_init(req, 0, class, method);
5151
if (ret)
5152
goto out_put_req;
5153
5154
if (req_page)
5155
osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
5156
0, false, false);
5157
if (resp_pages)
5158
osd_req_op_cls_response_data_pages(req, 0, resp_pages,
5159
*resp_len, 0, false, false);
5160
5161
ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
5162
if (ret)
5163
goto out_put_req;
5164
5165
ceph_osdc_start_request(osdc, req);
5166
ret = ceph_osdc_wait_request(osdc, req);
5167
if (ret >= 0) {
5168
ret = req->r_ops[0].rval;
5169
if (resp_pages)
5170
*resp_len = req->r_ops[0].outdata_len;
5171
}
5172
5173
out_put_req:
5174
ceph_osdc_put_request(req);
5175
return ret;
5176
}
5177
EXPORT_SYMBOL(ceph_osdc_call);
5178
5179
/*
5180
* reset all osd connections
5181
*/
5182
void ceph_osdc_reopen_osds(struct ceph_osd_client *osdc)
5183
{
5184
struct rb_node *n;
5185
5186
down_write(&osdc->lock);
5187
for (n = rb_first(&osdc->osds); n; ) {
5188
struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
5189
5190
n = rb_next(n);
5191
if (!reopen_osd(osd))
5192
kick_osd_requests(osd);
5193
}
5194
up_write(&osdc->lock);
5195
}
5196
5197
/*
5198
* init, shutdown
5199
*/
5200
int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
5201
{
5202
int err;
5203
5204
dout("init\n");
5205
osdc->client = client;
5206
init_rwsem(&osdc->lock);
5207
osdc->osds = RB_ROOT;
5208
INIT_LIST_HEAD(&osdc->osd_lru);
5209
spin_lock_init(&osdc->osd_lru_lock);
5210
osd_init(&osdc->homeless_osd);
5211
osdc->homeless_osd.o_osdc = osdc;
5212
osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
5213
osdc->last_linger_id = CEPH_LINGER_ID_START;
5214
osdc->linger_requests = RB_ROOT;
5215
osdc->map_checks = RB_ROOT;
5216
osdc->linger_map_checks = RB_ROOT;
5217
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
5218
INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
5219
5220
err = -ENOMEM;
5221
osdc->osdmap = ceph_osdmap_alloc();
5222
if (!osdc->osdmap)
5223
goto out;
5224
5225
osdc->req_mempool = mempool_create_slab_pool(10,
5226
ceph_osd_request_cache);
5227
if (!osdc->req_mempool)
5228
goto out_map;
5229
5230
err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
5231
PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op");
5232
if (err < 0)
5233
goto out_mempool;
5234
err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
5235
PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10,
5236
"osd_op_reply");
5237
if (err < 0)
5238
goto out_msgpool;
5239
5240
err = -ENOMEM;
5241
osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
5242
if (!osdc->notify_wq)
5243
goto out_msgpool_reply;
5244
5245
osdc->completion_wq = create_singlethread_workqueue("ceph-completion");
5246
if (!osdc->completion_wq)
5247
goto out_notify_wq;
5248
5249
schedule_delayed_work(&osdc->timeout_work,
5250
osdc->client->options->osd_keepalive_timeout);
5251
schedule_delayed_work(&osdc->osds_timeout_work,
5252
round_jiffies_relative(osdc->client->options->osd_idle_ttl));
5253
5254
return 0;
5255
5256
out_notify_wq:
5257
destroy_workqueue(osdc->notify_wq);
5258
out_msgpool_reply:
5259
ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5260
out_msgpool:
5261
ceph_msgpool_destroy(&osdc->msgpool_op);
5262
out_mempool:
5263
mempool_destroy(osdc->req_mempool);
5264
out_map:
5265
ceph_osdmap_destroy(osdc->osdmap);
5266
out:
5267
return err;
5268
}
5269
5270
void ceph_osdc_stop(struct ceph_osd_client *osdc)
5271
{
5272
destroy_workqueue(osdc->completion_wq);
5273
destroy_workqueue(osdc->notify_wq);
5274
cancel_delayed_work_sync(&osdc->timeout_work);
5275
cancel_delayed_work_sync(&osdc->osds_timeout_work);
5276
5277
down_write(&osdc->lock);
5278
while (!RB_EMPTY_ROOT(&osdc->osds)) {
5279
struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
5280
struct ceph_osd, o_node);
5281
close_osd(osd);
5282
}
5283
up_write(&osdc->lock);
5284
WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1);
5285
osd_cleanup(&osdc->homeless_osd);
5286
5287
WARN_ON(!list_empty(&osdc->osd_lru));
5288
WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
5289
WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks));
5290
WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks));
5291
WARN_ON(atomic_read(&osdc->num_requests));
5292
WARN_ON(atomic_read(&osdc->num_homeless));
5293
5294
ceph_osdmap_destroy(osdc->osdmap);
5295
mempool_destroy(osdc->req_mempool);
5296
ceph_msgpool_destroy(&osdc->msgpool_op);
5297
ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5298
}
5299
5300
int osd_req_op_copy_from_init(struct ceph_osd_request *req,
5301
u64 src_snapid, u64 src_version,
5302
struct ceph_object_id *src_oid,
5303
struct ceph_object_locator *src_oloc,
5304
u32 src_fadvise_flags,
5305
u32 dst_fadvise_flags,
5306
u32 truncate_seq, u64 truncate_size,
5307
u8 copy_from_flags)
5308
{
5309
struct ceph_osd_req_op *op;
5310
struct page **pages;
5311
void *p, *end;
5312
5313
pages = ceph_alloc_page_vector(1, GFP_KERNEL);
5314
if (IS_ERR(pages))
5315
return PTR_ERR(pages);
5316
5317
op = osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM2,
5318
dst_fadvise_flags);
5319
op->copy_from.snapid = src_snapid;
5320
op->copy_from.src_version = src_version;
5321
op->copy_from.flags = copy_from_flags;
5322
op->copy_from.src_fadvise_flags = src_fadvise_flags;
5323
5324
p = page_address(pages[0]);
5325
end = p + PAGE_SIZE;
5326
ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
5327
encode_oloc(&p, end, src_oloc);
5328
ceph_encode_32(&p, truncate_seq);
5329
ceph_encode_64(&p, truncate_size);
5330
op->indata_len = PAGE_SIZE - (end - p);
5331
5332
ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
5333
op->indata_len, 0, false, true);
5334
return 0;
5335
}
5336
EXPORT_SYMBOL(osd_req_op_copy_from_init);
5337
5338
int __init ceph_osdc_setup(void)
5339
{
5340
size_t size = sizeof(struct ceph_osd_request) +
5341
CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
5342
5343
BUG_ON(ceph_osd_request_cache);
5344
ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
5345
0, 0, NULL);
5346
5347
return ceph_osd_request_cache ? 0 : -ENOMEM;
5348
}
5349
5350
void ceph_osdc_cleanup(void)
5351
{
5352
BUG_ON(!ceph_osd_request_cache);
5353
kmem_cache_destroy(ceph_osd_request_cache);
5354
ceph_osd_request_cache = NULL;
5355
}
5356
5357
/*
5358
* handle incoming message
5359
*/
5360
static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5361
{
5362
struct ceph_osd *osd = con->private;
5363
struct ceph_osd_client *osdc = osd->o_osdc;
5364
int type = le16_to_cpu(msg->hdr.type);
5365
5366
switch (type) {
5367
case CEPH_MSG_OSD_MAP:
5368
ceph_osdc_handle_map(osdc, msg);
5369
break;
5370
case CEPH_MSG_OSD_OPREPLY:
5371
handle_reply(osd, msg);
5372
break;
5373
case CEPH_MSG_OSD_BACKOFF:
5374
handle_backoff(osd, msg);
5375
break;
5376
case CEPH_MSG_WATCH_NOTIFY:
5377
handle_watch_notify(osdc, msg);
5378
break;
5379
5380
default:
5381
pr_err("received unknown message type %d %s\n", type,
5382
ceph_msg_type_name(type));
5383
}
5384
5385
ceph_msg_put(msg);
5386
}
5387
5388
/* How much sparse data was requested? */
5389
static u64 sparse_data_requested(struct ceph_osd_request *req)
5390
{
5391
u64 len = 0;
5392
5393
if (req->r_flags & CEPH_OSD_FLAG_READ) {
5394
int i;
5395
5396
for (i = 0; i < req->r_num_ops; ++i) {
5397
struct ceph_osd_req_op *op = &req->r_ops[i];
5398
5399
if (op->op == CEPH_OSD_OP_SPARSE_READ)
5400
len += op->extent.length;
5401
}
5402
}
5403
return len;
5404
}
5405
5406
/*
5407
* Lookup and return message for incoming reply. Don't try to do
5408
* anything about a larger than preallocated data portion of the
5409
* message at the moment - for now, just skip the message.
5410
*/
5411
static struct ceph_msg *get_reply(struct ceph_connection *con,
5412
struct ceph_msg_header *hdr,
5413
int *skip)
5414
{
5415
struct ceph_osd *osd = con->private;
5416
struct ceph_osd_client *osdc = osd->o_osdc;
5417
struct ceph_msg *m = NULL;
5418
struct ceph_osd_request *req;
5419
int front_len = le32_to_cpu(hdr->front_len);
5420
int data_len = le32_to_cpu(hdr->data_len);
5421
u64 tid = le64_to_cpu(hdr->tid);
5422
u64 srlen;
5423
5424
down_read(&osdc->lock);
5425
if (!osd_registered(osd)) {
5426
dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
5427
*skip = 1;
5428
goto out_unlock_osdc;
5429
}
5430
WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
5431
5432
mutex_lock(&osd->lock);
5433
req = lookup_request(&osd->o_requests, tid);
5434
if (!req) {
5435
dout("%s osd%d tid %llu unknown, skipping\n", __func__,
5436
osd->o_osd, tid);
5437
*skip = 1;
5438
goto out_unlock_session;
5439
}
5440
5441
ceph_msg_revoke_incoming(req->r_reply);
5442
5443
if (front_len > req->r_reply->front_alloc_len) {
5444
pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
5445
__func__, osd->o_osd, req->r_tid, front_len,
5446
req->r_reply->front_alloc_len);
5447
m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
5448
false);
5449
if (!m)
5450
goto out_unlock_session;
5451
ceph_msg_put(req->r_reply);
5452
req->r_reply = m;
5453
}
5454
5455
srlen = sparse_data_requested(req);
5456
if (!srlen && data_len > req->r_reply->data_length) {
5457
pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
5458
__func__, osd->o_osd, req->r_tid, data_len,
5459
req->r_reply->data_length);
5460
m = NULL;
5461
*skip = 1;
5462
goto out_unlock_session;
5463
}
5464
5465
m = ceph_msg_get(req->r_reply);
5466
m->sparse_read_total = srlen;
5467
5468
dout("get_reply tid %lld %p\n", tid, m);
5469
5470
out_unlock_session:
5471
mutex_unlock(&osd->lock);
5472
out_unlock_osdc:
5473
up_read(&osdc->lock);
5474
return m;
5475
}
5476
5477
static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
5478
{
5479
struct ceph_msg *m;
5480
int type = le16_to_cpu(hdr->type);
5481
u32 front_len = le32_to_cpu(hdr->front_len);
5482
u32 data_len = le32_to_cpu(hdr->data_len);
5483
5484
m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false);
5485
if (!m)
5486
return NULL;
5487
5488
if (data_len) {
5489
struct page **pages;
5490
5491
pages = ceph_alloc_page_vector(calc_pages_for(0, data_len),
5492
GFP_NOIO);
5493
if (IS_ERR(pages)) {
5494
ceph_msg_put(m);
5495
return NULL;
5496
}
5497
5498
ceph_msg_data_add_pages(m, pages, data_len, 0, true);
5499
}
5500
5501
return m;
5502
}
5503
5504
static struct ceph_msg *osd_alloc_msg(struct ceph_connection *con,
5505
struct ceph_msg_header *hdr,
5506
int *skip)
5507
{
5508
struct ceph_osd *osd = con->private;
5509
int type = le16_to_cpu(hdr->type);
5510
5511
*skip = 0;
5512
switch (type) {
5513
case CEPH_MSG_OSD_MAP:
5514
case CEPH_MSG_OSD_BACKOFF:
5515
case CEPH_MSG_WATCH_NOTIFY:
5516
return alloc_msg_with_page_vector(hdr);
5517
case CEPH_MSG_OSD_OPREPLY:
5518
return get_reply(con, hdr, skip);
5519
default:
5520
pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
5521
osd->o_osd, type);
5522
*skip = 1;
5523
return NULL;
5524
}
5525
}
5526
5527
/*
5528
* Wrappers to refcount containing ceph_osd struct
5529
*/
5530
static struct ceph_connection *osd_get_con(struct ceph_connection *con)
5531
{
5532
struct ceph_osd *osd = con->private;
5533
if (get_osd(osd))
5534
return con;
5535
return NULL;
5536
}
5537
5538
static void osd_put_con(struct ceph_connection *con)
5539
{
5540
struct ceph_osd *osd = con->private;
5541
put_osd(osd);
5542
}
5543
5544
/*
5545
* authentication
5546
*/
5547
5548
/*
5549
* Note: returned pointer is the address of a structure that's
5550
* managed separately. Caller must *not* attempt to free it.
5551
*/
5552
static struct ceph_auth_handshake *
5553
osd_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
5554
{
5555
struct ceph_osd *o = con->private;
5556
struct ceph_osd_client *osdc = o->o_osdc;
5557
struct ceph_auth_client *ac = osdc->client->monc.auth;
5558
struct ceph_auth_handshake *auth = &o->o_auth;
5559
int ret;
5560
5561
ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_OSD,
5562
force_new, proto, NULL, NULL);
5563
if (ret)
5564
return ERR_PTR(ret);
5565
5566
return auth;
5567
}
5568
5569
static int osd_add_authorizer_challenge(struct ceph_connection *con,
5570
void *challenge_buf, int challenge_buf_len)
5571
{
5572
struct ceph_osd *o = con->private;
5573
struct ceph_osd_client *osdc = o->o_osdc;
5574
struct ceph_auth_client *ac = osdc->client->monc.auth;
5575
5576
return ceph_auth_add_authorizer_challenge(ac, o->o_auth.authorizer,
5577
challenge_buf, challenge_buf_len);
5578
}
5579
5580
static int osd_verify_authorizer_reply(struct ceph_connection *con)
5581
{
5582
struct ceph_osd *o = con->private;
5583
struct ceph_osd_client *osdc = o->o_osdc;
5584
struct ceph_auth_client *ac = osdc->client->monc.auth;
5585
struct ceph_auth_handshake *auth = &o->o_auth;
5586
5587
return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
5588
auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
5589
NULL, NULL, NULL, NULL);
5590
}
5591
5592
static int osd_invalidate_authorizer(struct ceph_connection *con)
5593
{
5594
struct ceph_osd *o = con->private;
5595
struct ceph_osd_client *osdc = o->o_osdc;
5596
struct ceph_auth_client *ac = osdc->client->monc.auth;
5597
5598
ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
5599
return ceph_monc_validate_auth(&osdc->client->monc);
5600
}
5601
5602
static int osd_get_auth_request(struct ceph_connection *con,
5603
void *buf, int *buf_len,
5604
void **authorizer, int *authorizer_len)
5605
{
5606
struct ceph_osd *o = con->private;
5607
struct ceph_auth_client *ac = o->o_osdc->client->monc.auth;
5608
struct ceph_auth_handshake *auth = &o->o_auth;
5609
int ret;
5610
5611
ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_OSD,
5612
buf, buf_len);
5613
if (ret)
5614
return ret;
5615
5616
*authorizer = auth->authorizer_buf;
5617
*authorizer_len = auth->authorizer_buf_len;
5618
return 0;
5619
}
5620
5621
static int osd_handle_auth_reply_more(struct ceph_connection *con,
5622
void *reply, int reply_len,
5623
void *buf, int *buf_len,
5624
void **authorizer, int *authorizer_len)
5625
{
5626
struct ceph_osd *o = con->private;
5627
struct ceph_auth_client *ac = o->o_osdc->client->monc.auth;
5628
struct ceph_auth_handshake *auth = &o->o_auth;
5629
int ret;
5630
5631
ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
5632
buf, buf_len);
5633
if (ret)
5634
return ret;
5635
5636
*authorizer = auth->authorizer_buf;
5637
*authorizer_len = auth->authorizer_buf_len;
5638
return 0;
5639
}
5640
5641
static int osd_handle_auth_done(struct ceph_connection *con,
5642
u64 global_id, void *reply, int reply_len,
5643
u8 *session_key, int *session_key_len,
5644
u8 *con_secret, int *con_secret_len)
5645
{
5646
struct ceph_osd *o = con->private;
5647
struct ceph_auth_client *ac = o->o_osdc->client->monc.auth;
5648
struct ceph_auth_handshake *auth = &o->o_auth;
5649
5650
return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
5651
session_key, session_key_len,
5652
con_secret, con_secret_len);
5653
}
5654
5655
static int osd_handle_auth_bad_method(struct ceph_connection *con,
5656
int used_proto, int result,
5657
const int *allowed_protos, int proto_cnt,
5658
const int *allowed_modes, int mode_cnt)
5659
{
5660
struct ceph_osd *o = con->private;
5661
struct ceph_mon_client *monc = &o->o_osdc->client->monc;
5662
int ret;
5663
5664
if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_OSD,
5665
used_proto, result,
5666
allowed_protos, proto_cnt,
5667
allowed_modes, mode_cnt)) {
5668
ret = ceph_monc_validate_auth(monc);
5669
if (ret)
5670
return ret;
5671
}
5672
5673
return -EACCES;
5674
}
5675
5676
static void osd_reencode_message(struct ceph_msg *msg)
5677
{
5678
int type = le16_to_cpu(msg->hdr.type);
5679
5680
if (type == CEPH_MSG_OSD_OP)
5681
encode_request_finish(msg);
5682
}
5683
5684
static int osd_sign_message(struct ceph_msg *msg)
5685
{
5686
struct ceph_osd *o = msg->con->private;
5687
struct ceph_auth_handshake *auth = &o->o_auth;
5688
5689
return ceph_auth_sign_message(auth, msg);
5690
}
5691
5692
static int osd_check_message_signature(struct ceph_msg *msg)
5693
{
5694
struct ceph_osd *o = msg->con->private;
5695
struct ceph_auth_handshake *auth = &o->o_auth;
5696
5697
return ceph_auth_check_message_signature(auth, msg);
5698
}
5699
5700
static void advance_cursor(struct ceph_msg_data_cursor *cursor, size_t len,
5701
bool zero)
5702
{
5703
while (len) {
5704
struct page *page;
5705
size_t poff, plen;
5706
5707
page = ceph_msg_data_next(cursor, &poff, &plen);
5708
if (plen > len)
5709
plen = len;
5710
if (zero)
5711
zero_user_segment(page, poff, poff + plen);
5712
len -= plen;
5713
ceph_msg_data_advance(cursor, plen);
5714
}
5715
}
5716
5717
static int prep_next_sparse_read(struct ceph_connection *con,
5718
struct ceph_msg_data_cursor *cursor)
5719
{
5720
struct ceph_osd *o = con->private;
5721
struct ceph_sparse_read *sr = &o->o_sparse_read;
5722
struct ceph_osd_request *req;
5723
struct ceph_osd_req_op *op;
5724
5725
spin_lock(&o->o_requests_lock);
5726
req = lookup_request(&o->o_requests, le64_to_cpu(con->in_msg->hdr.tid));
5727
if (!req) {
5728
spin_unlock(&o->o_requests_lock);
5729
return -EBADR;
5730
}
5731
5732
if (o->o_sparse_op_idx < 0) {
5733
dout("%s: [%d] starting new sparse read req\n",
5734
__func__, o->o_osd);
5735
} else {
5736
u64 end;
5737
5738
op = &req->r_ops[o->o_sparse_op_idx];
5739
5740
WARN_ON_ONCE(op->extent.sparse_ext);
5741
5742
/* hand back buffer we took earlier */
5743
op->extent.sparse_ext = sr->sr_extent;
5744
sr->sr_extent = NULL;
5745
op->extent.sparse_ext_cnt = sr->sr_count;
5746
sr->sr_ext_len = 0;
5747
dout("%s: [%d] completed extent array len %d cursor->resid %zd\n",
5748
__func__, o->o_osd, op->extent.sparse_ext_cnt, cursor->resid);
5749
/* Advance to end of data for this operation */
5750
end = ceph_sparse_ext_map_end(op);
5751
if (end < sr->sr_req_len)
5752
advance_cursor(cursor, sr->sr_req_len - end, false);
5753
}
5754
5755
ceph_init_sparse_read(sr);
5756
5757
/* find next op in this request (if any) */
5758
while (++o->o_sparse_op_idx < req->r_num_ops) {
5759
op = &req->r_ops[o->o_sparse_op_idx];
5760
if (op->op == CEPH_OSD_OP_SPARSE_READ)
5761
goto found;
5762
}
5763
5764
/* reset for next sparse read request */
5765
spin_unlock(&o->o_requests_lock);
5766
o->o_sparse_op_idx = -1;
5767
return 0;
5768
found:
5769
sr->sr_req_off = op->extent.offset;
5770
sr->sr_req_len = op->extent.length;
5771
sr->sr_pos = sr->sr_req_off;
5772
dout("%s: [%d] new sparse read op at idx %d 0x%llx~0x%llx\n", __func__,
5773
o->o_osd, o->o_sparse_op_idx, sr->sr_req_off, sr->sr_req_len);
5774
5775
/* hand off request's sparse extent map buffer */
5776
sr->sr_ext_len = op->extent.sparse_ext_cnt;
5777
op->extent.sparse_ext_cnt = 0;
5778
sr->sr_extent = op->extent.sparse_ext;
5779
op->extent.sparse_ext = NULL;
5780
5781
spin_unlock(&o->o_requests_lock);
5782
return 1;
5783
}
5784
5785
#ifdef __BIG_ENDIAN
5786
static inline void convert_extent_map(struct ceph_sparse_read *sr)
5787
{
5788
int i;
5789
5790
for (i = 0; i < sr->sr_count; i++) {
5791
struct ceph_sparse_extent *ext = &sr->sr_extent[i];
5792
5793
ext->off = le64_to_cpu((__force __le64)ext->off);
5794
ext->len = le64_to_cpu((__force __le64)ext->len);
5795
}
5796
}
5797
#else
5798
static inline void convert_extent_map(struct ceph_sparse_read *sr)
5799
{
5800
}
5801
#endif
5802
5803
static int osd_sparse_read(struct ceph_connection *con,
5804
struct ceph_msg_data_cursor *cursor,
5805
char **pbuf)
5806
{
5807
struct ceph_osd *o = con->private;
5808
struct ceph_sparse_read *sr = &o->o_sparse_read;
5809
u32 count = sr->sr_count;
5810
u64 eoff, elen, len = 0;
5811
int i, ret;
5812
5813
switch (sr->sr_state) {
5814
case CEPH_SPARSE_READ_HDR:
5815
next_op:
5816
ret = prep_next_sparse_read(con, cursor);
5817
if (ret <= 0)
5818
return ret;
5819
5820
/* number of extents */
5821
ret = sizeof(sr->sr_count);
5822
*pbuf = (char *)&sr->sr_count;
5823
sr->sr_state = CEPH_SPARSE_READ_EXTENTS;
5824
break;
5825
case CEPH_SPARSE_READ_EXTENTS:
5826
/* Convert sr_count to host-endian */
5827
count = le32_to_cpu((__force __le32)sr->sr_count);
5828
sr->sr_count = count;
5829
dout("[%d] got %u extents\n", o->o_osd, count);
5830
5831
if (count > 0) {
5832
if (!sr->sr_extent || count > sr->sr_ext_len) {
5833
/* no extent array provided, or too short */
5834
kfree(sr->sr_extent);
5835
sr->sr_extent = kmalloc_array(count,
5836
sizeof(*sr->sr_extent),
5837
GFP_NOIO);
5838
if (!sr->sr_extent) {
5839
pr_err("%s: failed to allocate %u extents\n",
5840
__func__, count);
5841
return -ENOMEM;
5842
}
5843
sr->sr_ext_len = count;
5844
}
5845
ret = count * sizeof(*sr->sr_extent);
5846
*pbuf = (char *)sr->sr_extent;
5847
sr->sr_state = CEPH_SPARSE_READ_DATA_LEN;
5848
break;
5849
}
5850
/* No extents? Read data len */
5851
fallthrough;
5852
case CEPH_SPARSE_READ_DATA_LEN:
5853
convert_extent_map(sr);
5854
ret = sizeof(sr->sr_datalen);
5855
*pbuf = (char *)&sr->sr_datalen;
5856
sr->sr_state = CEPH_SPARSE_READ_DATA_PRE;
5857
break;
5858
case CEPH_SPARSE_READ_DATA_PRE:
5859
/* Convert sr_datalen to host-endian */
5860
sr->sr_datalen = le32_to_cpu((__force __le32)sr->sr_datalen);
5861
for (i = 0; i < count; i++)
5862
len += sr->sr_extent[i].len;
5863
if (sr->sr_datalen != len) {
5864
pr_warn_ratelimited("data len %u != extent len %llu\n",
5865
sr->sr_datalen, len);
5866
return -EREMOTEIO;
5867
}
5868
sr->sr_state = CEPH_SPARSE_READ_DATA;
5869
fallthrough;
5870
case CEPH_SPARSE_READ_DATA:
5871
if (sr->sr_index >= count) {
5872
sr->sr_state = CEPH_SPARSE_READ_HDR;
5873
goto next_op;
5874
}
5875
5876
eoff = sr->sr_extent[sr->sr_index].off;
5877
elen = sr->sr_extent[sr->sr_index].len;
5878
5879
dout("[%d] ext %d off 0x%llx len 0x%llx\n",
5880
o->o_osd, sr->sr_index, eoff, elen);
5881
5882
if (elen > INT_MAX) {
5883
dout("Sparse read extent length too long (0x%llx)\n",
5884
elen);
5885
return -EREMOTEIO;
5886
}
5887
5888
/* zero out anything from sr_pos to start of extent */
5889
if (sr->sr_pos < eoff)
5890
advance_cursor(cursor, eoff - sr->sr_pos, true);
5891
5892
/* Set position to end of extent */
5893
sr->sr_pos = eoff + elen;
5894
5895
/* send back the new length and nullify the ptr */
5896
cursor->sr_resid = elen;
5897
ret = elen;
5898
*pbuf = NULL;
5899
5900
/* Bump the array index */
5901
++sr->sr_index;
5902
break;
5903
}
5904
return ret;
5905
}
5906
5907
static const struct ceph_connection_operations osd_con_ops = {
5908
.get = osd_get_con,
5909
.put = osd_put_con,
5910
.sparse_read = osd_sparse_read,
5911
.alloc_msg = osd_alloc_msg,
5912
.dispatch = osd_dispatch,
5913
.fault = osd_fault,
5914
.reencode_message = osd_reencode_message,
5915
.get_authorizer = osd_get_authorizer,
5916
.add_authorizer_challenge = osd_add_authorizer_challenge,
5917
.verify_authorizer_reply = osd_verify_authorizer_reply,
5918
.invalidate_authorizer = osd_invalidate_authorizer,
5919
.sign_message = osd_sign_message,
5920
.check_message_signature = osd_check_message_signature,
5921
.get_auth_request = osd_get_auth_request,
5922
.handle_auth_reply_more = osd_handle_auth_reply_more,
5923
.handle_auth_done = osd_handle_auth_done,
5924
.handle_auth_bad_method = osd_handle_auth_bad_method,
5925
};
5926
5927