Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/net/ceph/mon_client.c
26278 views
1
// SPDX-License-Identifier: GPL-2.0
2
#include <linux/ceph/ceph_debug.h>
3
4
#include <linux/module.h>
5
#include <linux/types.h>
6
#include <linux/slab.h>
7
#include <linux/random.h>
8
#include <linux/sched.h>
9
10
#include <linux/ceph/ceph_features.h>
11
#include <linux/ceph/mon_client.h>
12
#include <linux/ceph/libceph.h>
13
#include <linux/ceph/debugfs.h>
14
#include <linux/ceph/decode.h>
15
#include <linux/ceph/auth.h>
16
17
/*
18
* Interact with Ceph monitor cluster. Handle requests for new map
19
* versions, and periodically resend as needed. Also implement
20
* statfs() and umount().
21
*
22
* A small cluster of Ceph "monitors" are responsible for managing critical
23
* cluster configuration and state information. An odd number (e.g., 3, 5)
24
* of cmon daemons use a modified version of the Paxos part-time parliament
25
* algorithm to manage the MDS map (mds cluster membership), OSD map, and
26
* list of clients who have mounted the file system.
27
*
28
* We maintain an open, active session with a monitor at all times in order to
29
* receive timely MDSMap updates. We periodically send a keepalive byte on the
30
* TCP socket to ensure we detect a failure. If the connection does break, we
31
* randomly hunt for a new monitor. Once the connection is reestablished, we
32
* resend any outstanding requests.
33
*/
34
35
static const struct ceph_connection_operations mon_con_ops;
36
37
static int __validate_auth(struct ceph_mon_client *monc);
38
39
static int decode_mon_info(void **p, void *end, bool msgr2,
40
struct ceph_entity_addr *addr)
41
{
42
void *mon_info_end;
43
u32 struct_len;
44
u8 struct_v;
45
int ret;
46
47
ret = ceph_start_decoding(p, end, 1, "mon_info_t", &struct_v,
48
&struct_len);
49
if (ret)
50
return ret;
51
52
mon_info_end = *p + struct_len;
53
ceph_decode_skip_string(p, end, e_inval); /* skip mon name */
54
ret = ceph_decode_entity_addrvec(p, end, msgr2, addr);
55
if (ret)
56
return ret;
57
58
*p = mon_info_end;
59
return 0;
60
61
e_inval:
62
return -EINVAL;
63
}
64
65
/*
66
* Decode a monmap blob (e.g., during mount).
67
*
68
* Assume MonMap v3 (i.e. encoding with MONNAMES and MONENC).
69
*/
70
static struct ceph_monmap *ceph_monmap_decode(void **p, void *end, bool msgr2)
71
{
72
struct ceph_monmap *monmap = NULL;
73
struct ceph_fsid fsid;
74
u32 struct_len;
75
int blob_len;
76
int num_mon;
77
u8 struct_v;
78
u32 epoch;
79
int ret;
80
int i;
81
82
ceph_decode_32_safe(p, end, blob_len, e_inval);
83
ceph_decode_need(p, end, blob_len, e_inval);
84
85
ret = ceph_start_decoding(p, end, 6, "monmap", &struct_v, &struct_len);
86
if (ret)
87
goto fail;
88
89
dout("%s struct_v %d\n", __func__, struct_v);
90
ceph_decode_copy_safe(p, end, &fsid, sizeof(fsid), e_inval);
91
ceph_decode_32_safe(p, end, epoch, e_inval);
92
if (struct_v >= 6) {
93
u32 feat_struct_len;
94
u8 feat_struct_v;
95
96
*p += sizeof(struct ceph_timespec); /* skip last_changed */
97
*p += sizeof(struct ceph_timespec); /* skip created */
98
99
ret = ceph_start_decoding(p, end, 1, "mon_feature_t",
100
&feat_struct_v, &feat_struct_len);
101
if (ret)
102
goto fail;
103
104
*p += feat_struct_len; /* skip persistent_features */
105
106
ret = ceph_start_decoding(p, end, 1, "mon_feature_t",
107
&feat_struct_v, &feat_struct_len);
108
if (ret)
109
goto fail;
110
111
*p += feat_struct_len; /* skip optional_features */
112
}
113
ceph_decode_32_safe(p, end, num_mon, e_inval);
114
115
dout("%s fsid %pU epoch %u num_mon %d\n", __func__, &fsid, epoch,
116
num_mon);
117
if (num_mon > CEPH_MAX_MON)
118
goto e_inval;
119
120
monmap = kmalloc(struct_size(monmap, mon_inst, num_mon), GFP_NOIO);
121
if (!monmap) {
122
ret = -ENOMEM;
123
goto fail;
124
}
125
monmap->fsid = fsid;
126
monmap->epoch = epoch;
127
monmap->num_mon = num_mon;
128
129
/* legacy_mon_addr map or mon_info map */
130
for (i = 0; i < num_mon; i++) {
131
struct ceph_entity_inst *inst = &monmap->mon_inst[i];
132
133
ceph_decode_skip_string(p, end, e_inval); /* skip mon name */
134
inst->name.type = CEPH_ENTITY_TYPE_MON;
135
inst->name.num = cpu_to_le64(i);
136
137
if (struct_v >= 6)
138
ret = decode_mon_info(p, end, msgr2, &inst->addr);
139
else
140
ret = ceph_decode_entity_addr(p, end, &inst->addr);
141
if (ret)
142
goto fail;
143
144
dout("%s mon%d addr %s\n", __func__, i,
145
ceph_pr_addr(&inst->addr));
146
}
147
148
return monmap;
149
150
e_inval:
151
ret = -EINVAL;
152
fail:
153
kfree(monmap);
154
return ERR_PTR(ret);
155
}
156
157
/*
158
* return true if *addr is included in the monmap.
159
*/
160
int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
161
{
162
int i;
163
164
for (i = 0; i < m->num_mon; i++) {
165
if (ceph_addr_equal_no_type(addr, &m->mon_inst[i].addr))
166
return 1;
167
}
168
169
return 0;
170
}
171
172
/*
173
* Send an auth request.
174
*/
175
static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
176
{
177
monc->pending_auth = 1;
178
monc->m_auth->front.iov_len = len;
179
monc->m_auth->hdr.front_len = cpu_to_le32(len);
180
ceph_msg_revoke(monc->m_auth);
181
ceph_msg_get(monc->m_auth); /* keep our ref */
182
ceph_con_send(&monc->con, monc->m_auth);
183
}
184
185
/*
186
* Close monitor session, if any.
187
*/
188
static void __close_session(struct ceph_mon_client *monc)
189
{
190
dout("__close_session closing mon%d\n", monc->cur_mon);
191
ceph_msg_revoke(monc->m_auth);
192
ceph_msg_revoke_incoming(monc->m_auth_reply);
193
ceph_msg_revoke(monc->m_subscribe);
194
ceph_msg_revoke_incoming(monc->m_subscribe_ack);
195
ceph_con_close(&monc->con);
196
197
monc->pending_auth = 0;
198
ceph_auth_reset(monc->auth);
199
}
200
201
/*
202
* Pick a new monitor at random and set cur_mon. If we are repicking
203
* (i.e. cur_mon is already set), be sure to pick a different one.
204
*/
205
static void pick_new_mon(struct ceph_mon_client *monc)
206
{
207
int old_mon = monc->cur_mon;
208
209
BUG_ON(monc->monmap->num_mon < 1);
210
211
if (monc->monmap->num_mon == 1) {
212
monc->cur_mon = 0;
213
} else {
214
int max = monc->monmap->num_mon;
215
int o = -1;
216
int n;
217
218
if (monc->cur_mon >= 0) {
219
if (monc->cur_mon < monc->monmap->num_mon)
220
o = monc->cur_mon;
221
if (o >= 0)
222
max--;
223
}
224
225
n = get_random_u32_below(max);
226
if (o >= 0 && n >= o)
227
n++;
228
229
monc->cur_mon = n;
230
}
231
232
dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
233
monc->cur_mon, monc->monmap->num_mon);
234
}
235
236
/*
237
* Open a session with a new monitor.
238
*/
239
static void __open_session(struct ceph_mon_client *monc)
240
{
241
int ret;
242
243
pick_new_mon(monc);
244
245
monc->hunting = true;
246
if (monc->had_a_connection) {
247
monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
248
if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
249
monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
250
}
251
252
monc->sub_renew_after = jiffies; /* i.e., expired */
253
monc->sub_renew_sent = 0;
254
255
dout("%s opening mon%d\n", __func__, monc->cur_mon);
256
ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
257
&monc->monmap->mon_inst[monc->cur_mon].addr);
258
259
/*
260
* Queue a keepalive to ensure that in case of an early fault
261
* the messenger doesn't put us into STANDBY state and instead
262
* retries. This also ensures that our timestamp is valid by
263
* the time we finish hunting and delayed_work() checks it.
264
*/
265
ceph_con_keepalive(&monc->con);
266
if (ceph_msgr2(monc->client)) {
267
monc->pending_auth = 1;
268
return;
269
}
270
271
/* initiate authentication handshake */
272
ret = ceph_auth_build_hello(monc->auth,
273
monc->m_auth->front.iov_base,
274
monc->m_auth->front_alloc_len);
275
BUG_ON(ret <= 0);
276
__send_prepared_auth_request(monc, ret);
277
}
278
279
static void reopen_session(struct ceph_mon_client *monc)
280
{
281
if (!monc->hunting)
282
pr_info("mon%d %s session lost, hunting for new mon\n",
283
monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr));
284
285
__close_session(monc);
286
__open_session(monc);
287
}
288
289
void ceph_monc_reopen_session(struct ceph_mon_client *monc)
290
{
291
mutex_lock(&monc->mutex);
292
reopen_session(monc);
293
mutex_unlock(&monc->mutex);
294
}
295
296
static void un_backoff(struct ceph_mon_client *monc)
297
{
298
monc->hunt_mult /= 2; /* reduce by 50% */
299
if (monc->hunt_mult < 1)
300
monc->hunt_mult = 1;
301
dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult);
302
}
303
304
/*
305
* Reschedule delayed work timer.
306
*/
307
static void __schedule_delayed(struct ceph_mon_client *monc)
308
{
309
unsigned long delay;
310
311
if (monc->hunting)
312
delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
313
else
314
delay = CEPH_MONC_PING_INTERVAL;
315
316
dout("__schedule_delayed after %lu\n", delay);
317
mod_delayed_work(system_wq, &monc->delayed_work,
318
round_jiffies_relative(delay));
319
}
320
321
const char *ceph_sub_str[] = {
322
[CEPH_SUB_MONMAP] = "monmap",
323
[CEPH_SUB_OSDMAP] = "osdmap",
324
[CEPH_SUB_FSMAP] = "fsmap.user",
325
[CEPH_SUB_MDSMAP] = "mdsmap",
326
};
327
328
/*
329
* Send subscribe request for one or more maps, according to
330
* monc->subs.
331
*/
332
static void __send_subscribe(struct ceph_mon_client *monc)
333
{
334
struct ceph_msg *msg = monc->m_subscribe;
335
void *p = msg->front.iov_base;
336
void *const end = p + msg->front_alloc_len;
337
int num = 0;
338
int i;
339
340
dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
341
342
BUG_ON(monc->cur_mon < 0);
343
344
if (!monc->sub_renew_sent)
345
monc->sub_renew_sent = jiffies | 1; /* never 0 */
346
347
msg->hdr.version = cpu_to_le16(2);
348
349
for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
350
if (monc->subs[i].want)
351
num++;
352
}
353
BUG_ON(num < 1); /* monmap sub is always there */
354
ceph_encode_32(&p, num);
355
for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
356
char buf[32];
357
int len;
358
359
if (!monc->subs[i].want)
360
continue;
361
362
len = sprintf(buf, "%s", ceph_sub_str[i]);
363
if (i == CEPH_SUB_MDSMAP &&
364
monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
365
len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
366
367
dout("%s %s start %llu flags 0x%x\n", __func__, buf,
368
le64_to_cpu(monc->subs[i].item.start),
369
monc->subs[i].item.flags);
370
ceph_encode_string(&p, end, buf, len);
371
memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
372
p += sizeof(monc->subs[i].item);
373
}
374
375
BUG_ON(p > end);
376
msg->front.iov_len = p - msg->front.iov_base;
377
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
378
ceph_msg_revoke(msg);
379
ceph_con_send(&monc->con, ceph_msg_get(msg));
380
}
381
382
static void handle_subscribe_ack(struct ceph_mon_client *monc,
383
struct ceph_msg *msg)
384
{
385
unsigned int seconds;
386
struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
387
388
if (msg->front.iov_len < sizeof(*h))
389
goto bad;
390
seconds = le32_to_cpu(h->duration);
391
392
mutex_lock(&monc->mutex);
393
if (monc->sub_renew_sent) {
394
/*
395
* This is only needed for legacy (infernalis or older)
396
* MONs -- see delayed_work().
397
*/
398
monc->sub_renew_after = monc->sub_renew_sent +
399
(seconds >> 1) * HZ - 1;
400
dout("%s sent %lu duration %d renew after %lu\n", __func__,
401
monc->sub_renew_sent, seconds, monc->sub_renew_after);
402
monc->sub_renew_sent = 0;
403
} else {
404
dout("%s sent %lu renew after %lu, ignoring\n", __func__,
405
monc->sub_renew_sent, monc->sub_renew_after);
406
}
407
mutex_unlock(&monc->mutex);
408
return;
409
bad:
410
pr_err("got corrupt subscribe-ack msg\n");
411
ceph_msg_dump(msg);
412
}
413
414
/*
415
* Register interest in a map
416
*
417
* @sub: one of CEPH_SUB_*
418
* @epoch: X for "every map since X", or 0 for "just the latest"
419
*/
420
static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
421
u32 epoch, bool continuous)
422
{
423
__le64 start = cpu_to_le64(epoch);
424
u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
425
426
dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
427
epoch, continuous);
428
429
if (monc->subs[sub].want &&
430
monc->subs[sub].item.start == start &&
431
monc->subs[sub].item.flags == flags)
432
return false;
433
434
monc->subs[sub].item.start = start;
435
monc->subs[sub].item.flags = flags;
436
monc->subs[sub].want = true;
437
438
return true;
439
}
440
441
bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
442
bool continuous)
443
{
444
bool need_request;
445
446
mutex_lock(&monc->mutex);
447
need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
448
mutex_unlock(&monc->mutex);
449
450
return need_request;
451
}
452
EXPORT_SYMBOL(ceph_monc_want_map);
453
454
/*
455
* Keep track of which maps we have
456
*
457
* @sub: one of CEPH_SUB_*
458
*/
459
static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
460
u32 epoch)
461
{
462
dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
463
464
if (monc->subs[sub].want) {
465
if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
466
monc->subs[sub].want = false;
467
else
468
monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
469
}
470
471
monc->subs[sub].have = epoch;
472
}
473
474
void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
475
{
476
mutex_lock(&monc->mutex);
477
__ceph_monc_got_map(monc, sub, epoch);
478
mutex_unlock(&monc->mutex);
479
}
480
EXPORT_SYMBOL(ceph_monc_got_map);
481
482
void ceph_monc_renew_subs(struct ceph_mon_client *monc)
483
{
484
mutex_lock(&monc->mutex);
485
__send_subscribe(monc);
486
mutex_unlock(&monc->mutex);
487
}
488
EXPORT_SYMBOL(ceph_monc_renew_subs);
489
490
/*
491
* Wait for an osdmap with a given epoch.
492
*
493
* @epoch: epoch to wait for
494
* @timeout: in jiffies, 0 means "wait forever"
495
*/
496
int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
497
unsigned long timeout)
498
{
499
unsigned long started = jiffies;
500
long ret;
501
502
mutex_lock(&monc->mutex);
503
while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
504
mutex_unlock(&monc->mutex);
505
506
if (timeout && time_after_eq(jiffies, started + timeout))
507
return -ETIMEDOUT;
508
509
ret = wait_event_interruptible_timeout(monc->client->auth_wq,
510
monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
511
ceph_timeout_jiffies(timeout));
512
if (ret < 0)
513
return ret;
514
515
mutex_lock(&monc->mutex);
516
}
517
518
mutex_unlock(&monc->mutex);
519
return 0;
520
}
521
EXPORT_SYMBOL(ceph_monc_wait_osdmap);
522
523
/*
524
* Open a session with a random monitor. Request monmap and osdmap,
525
* which are waited upon in __ceph_open_session().
526
*/
527
int ceph_monc_open_session(struct ceph_mon_client *monc)
528
{
529
mutex_lock(&monc->mutex);
530
__ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
531
__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
532
__open_session(monc);
533
__schedule_delayed(monc);
534
mutex_unlock(&monc->mutex);
535
return 0;
536
}
537
EXPORT_SYMBOL(ceph_monc_open_session);
538
539
static void ceph_monc_handle_map(struct ceph_mon_client *monc,
540
struct ceph_msg *msg)
541
{
542
struct ceph_client *client = monc->client;
543
struct ceph_monmap *monmap;
544
void *p, *end;
545
546
mutex_lock(&monc->mutex);
547
548
dout("handle_monmap\n");
549
p = msg->front.iov_base;
550
end = p + msg->front.iov_len;
551
552
monmap = ceph_monmap_decode(&p, end, ceph_msgr2(client));
553
if (IS_ERR(monmap)) {
554
pr_err("problem decoding monmap, %d\n",
555
(int)PTR_ERR(monmap));
556
ceph_msg_dump(msg);
557
goto out;
558
}
559
560
if (ceph_check_fsid(client, &monmap->fsid) < 0) {
561
kfree(monmap);
562
goto out;
563
}
564
565
kfree(monc->monmap);
566
monc->monmap = monmap;
567
568
__ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
569
client->have_fsid = true;
570
571
out:
572
mutex_unlock(&monc->mutex);
573
wake_up_all(&client->auth_wq);
574
}
575
576
/*
577
* generic requests (currently statfs, mon_get_version)
578
*/
579
DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
580
581
static void release_generic_request(struct kref *kref)
582
{
583
struct ceph_mon_generic_request *req =
584
container_of(kref, struct ceph_mon_generic_request, kref);
585
586
dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
587
req->reply);
588
WARN_ON(!RB_EMPTY_NODE(&req->node));
589
590
if (req->reply)
591
ceph_msg_put(req->reply);
592
if (req->request)
593
ceph_msg_put(req->request);
594
595
kfree(req);
596
}
597
598
static void put_generic_request(struct ceph_mon_generic_request *req)
599
{
600
if (req)
601
kref_put(&req->kref, release_generic_request);
602
}
603
604
static void get_generic_request(struct ceph_mon_generic_request *req)
605
{
606
kref_get(&req->kref);
607
}
608
609
static struct ceph_mon_generic_request *
610
alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
611
{
612
struct ceph_mon_generic_request *req;
613
614
req = kzalloc(sizeof(*req), gfp);
615
if (!req)
616
return NULL;
617
618
req->monc = monc;
619
kref_init(&req->kref);
620
RB_CLEAR_NODE(&req->node);
621
init_completion(&req->completion);
622
623
dout("%s greq %p\n", __func__, req);
624
return req;
625
}
626
627
static void register_generic_request(struct ceph_mon_generic_request *req)
628
{
629
struct ceph_mon_client *monc = req->monc;
630
631
WARN_ON(req->tid);
632
633
get_generic_request(req);
634
req->tid = ++monc->last_tid;
635
insert_generic_request(&monc->generic_request_tree, req);
636
}
637
638
static void send_generic_request(struct ceph_mon_client *monc,
639
struct ceph_mon_generic_request *req)
640
{
641
WARN_ON(!req->tid);
642
643
dout("%s greq %p tid %llu\n", __func__, req, req->tid);
644
req->request->hdr.tid = cpu_to_le64(req->tid);
645
ceph_con_send(&monc->con, ceph_msg_get(req->request));
646
}
647
648
static void __finish_generic_request(struct ceph_mon_generic_request *req)
649
{
650
struct ceph_mon_client *monc = req->monc;
651
652
dout("%s greq %p tid %llu\n", __func__, req, req->tid);
653
erase_generic_request(&monc->generic_request_tree, req);
654
655
ceph_msg_revoke(req->request);
656
ceph_msg_revoke_incoming(req->reply);
657
}
658
659
static void finish_generic_request(struct ceph_mon_generic_request *req)
660
{
661
__finish_generic_request(req);
662
put_generic_request(req);
663
}
664
665
static void complete_generic_request(struct ceph_mon_generic_request *req)
666
{
667
if (req->complete_cb)
668
req->complete_cb(req);
669
else
670
complete_all(&req->completion);
671
put_generic_request(req);
672
}
673
674
static void cancel_generic_request(struct ceph_mon_generic_request *req)
675
{
676
struct ceph_mon_client *monc = req->monc;
677
struct ceph_mon_generic_request *lookup_req;
678
679
dout("%s greq %p tid %llu\n", __func__, req, req->tid);
680
681
mutex_lock(&monc->mutex);
682
lookup_req = lookup_generic_request(&monc->generic_request_tree,
683
req->tid);
684
if (lookup_req) {
685
WARN_ON(lookup_req != req);
686
finish_generic_request(req);
687
}
688
689
mutex_unlock(&monc->mutex);
690
}
691
692
static int wait_generic_request(struct ceph_mon_generic_request *req)
693
{
694
int ret;
695
696
dout("%s greq %p tid %llu\n", __func__, req, req->tid);
697
ret = wait_for_completion_interruptible(&req->completion);
698
if (ret)
699
cancel_generic_request(req);
700
else
701
ret = req->result; /* completed */
702
703
return ret;
704
}
705
706
static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
707
struct ceph_msg_header *hdr,
708
int *skip)
709
{
710
struct ceph_mon_client *monc = con->private;
711
struct ceph_mon_generic_request *req;
712
u64 tid = le64_to_cpu(hdr->tid);
713
struct ceph_msg *m;
714
715
mutex_lock(&monc->mutex);
716
req = lookup_generic_request(&monc->generic_request_tree, tid);
717
if (!req) {
718
dout("get_generic_reply %lld dne\n", tid);
719
*skip = 1;
720
m = NULL;
721
} else {
722
dout("get_generic_reply %lld got %p\n", tid, req->reply);
723
*skip = 0;
724
m = ceph_msg_get(req->reply);
725
/*
726
* we don't need to track the connection reading into
727
* this reply because we only have one open connection
728
* at a time, ever.
729
*/
730
}
731
mutex_unlock(&monc->mutex);
732
return m;
733
}
734
735
/*
736
* statfs
737
*/
738
static void handle_statfs_reply(struct ceph_mon_client *monc,
739
struct ceph_msg *msg)
740
{
741
struct ceph_mon_generic_request *req;
742
struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
743
u64 tid = le64_to_cpu(msg->hdr.tid);
744
745
dout("%s msg %p tid %llu\n", __func__, msg, tid);
746
747
if (msg->front.iov_len != sizeof(*reply))
748
goto bad;
749
750
mutex_lock(&monc->mutex);
751
req = lookup_generic_request(&monc->generic_request_tree, tid);
752
if (!req) {
753
mutex_unlock(&monc->mutex);
754
return;
755
}
756
757
req->result = 0;
758
*req->u.st = reply->st; /* struct */
759
__finish_generic_request(req);
760
mutex_unlock(&monc->mutex);
761
762
complete_generic_request(req);
763
return;
764
765
bad:
766
pr_err("corrupt statfs reply, tid %llu\n", tid);
767
ceph_msg_dump(msg);
768
}
769
770
/*
771
* Do a synchronous statfs().
772
*/
773
int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool,
774
struct ceph_statfs *buf)
775
{
776
struct ceph_mon_generic_request *req;
777
struct ceph_mon_statfs *h;
778
int ret = -ENOMEM;
779
780
req = alloc_generic_request(monc, GFP_NOFS);
781
if (!req)
782
goto out;
783
784
req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
785
true);
786
if (!req->request)
787
goto out;
788
789
req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
790
if (!req->reply)
791
goto out;
792
793
req->u.st = buf;
794
req->request->hdr.version = cpu_to_le16(2);
795
796
mutex_lock(&monc->mutex);
797
register_generic_request(req);
798
/* fill out request */
799
h = req->request->front.iov_base;
800
h->monhdr.have_version = 0;
801
h->monhdr.session_mon = cpu_to_le16(-1);
802
h->monhdr.session_mon_tid = 0;
803
h->fsid = monc->monmap->fsid;
804
h->contains_data_pool = (data_pool != CEPH_NOPOOL);
805
h->data_pool = cpu_to_le64(data_pool);
806
send_generic_request(monc, req);
807
mutex_unlock(&monc->mutex);
808
809
ret = wait_generic_request(req);
810
out:
811
put_generic_request(req);
812
return ret;
813
}
814
EXPORT_SYMBOL(ceph_monc_do_statfs);
815
816
static void handle_get_version_reply(struct ceph_mon_client *monc,
817
struct ceph_msg *msg)
818
{
819
struct ceph_mon_generic_request *req;
820
u64 tid = le64_to_cpu(msg->hdr.tid);
821
void *p = msg->front.iov_base;
822
void *end = p + msg->front_alloc_len;
823
u64 handle;
824
825
dout("%s msg %p tid %llu\n", __func__, msg, tid);
826
827
ceph_decode_need(&p, end, 2*sizeof(u64), bad);
828
handle = ceph_decode_64(&p);
829
if (tid != 0 && tid != handle)
830
goto bad;
831
832
mutex_lock(&monc->mutex);
833
req = lookup_generic_request(&monc->generic_request_tree, handle);
834
if (!req) {
835
mutex_unlock(&monc->mutex);
836
return;
837
}
838
839
req->result = 0;
840
req->u.newest = ceph_decode_64(&p);
841
__finish_generic_request(req);
842
mutex_unlock(&monc->mutex);
843
844
complete_generic_request(req);
845
return;
846
847
bad:
848
pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
849
ceph_msg_dump(msg);
850
}
851
852
static struct ceph_mon_generic_request *
853
__ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
854
ceph_monc_callback_t cb, u64 private_data)
855
{
856
struct ceph_mon_generic_request *req;
857
858
req = alloc_generic_request(monc, GFP_NOIO);
859
if (!req)
860
goto err_put_req;
861
862
req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
863
sizeof(u64) + sizeof(u32) + strlen(what),
864
GFP_NOIO, true);
865
if (!req->request)
866
goto err_put_req;
867
868
req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
869
true);
870
if (!req->reply)
871
goto err_put_req;
872
873
req->complete_cb = cb;
874
req->private_data = private_data;
875
876
mutex_lock(&monc->mutex);
877
register_generic_request(req);
878
{
879
void *p = req->request->front.iov_base;
880
void *const end = p + req->request->front_alloc_len;
881
882
ceph_encode_64(&p, req->tid); /* handle */
883
ceph_encode_string(&p, end, what, strlen(what));
884
WARN_ON(p != end);
885
}
886
send_generic_request(monc, req);
887
mutex_unlock(&monc->mutex);
888
889
return req;
890
891
err_put_req:
892
put_generic_request(req);
893
return ERR_PTR(-ENOMEM);
894
}
895
896
/*
897
* Send MMonGetVersion and wait for the reply.
898
*
899
* @what: one of "mdsmap", "osdmap" or "monmap"
900
*/
901
int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
902
u64 *newest)
903
{
904
struct ceph_mon_generic_request *req;
905
int ret;
906
907
req = __ceph_monc_get_version(monc, what, NULL, 0);
908
if (IS_ERR(req))
909
return PTR_ERR(req);
910
911
ret = wait_generic_request(req);
912
if (!ret)
913
*newest = req->u.newest;
914
915
put_generic_request(req);
916
return ret;
917
}
918
EXPORT_SYMBOL(ceph_monc_get_version);
919
920
/*
921
* Send MMonGetVersion,
922
*
923
* @what: one of "mdsmap", "osdmap" or "monmap"
924
*/
925
int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
926
ceph_monc_callback_t cb, u64 private_data)
927
{
928
struct ceph_mon_generic_request *req;
929
930
req = __ceph_monc_get_version(monc, what, cb, private_data);
931
if (IS_ERR(req))
932
return PTR_ERR(req);
933
934
put_generic_request(req);
935
return 0;
936
}
937
EXPORT_SYMBOL(ceph_monc_get_version_async);
938
939
static void handle_command_ack(struct ceph_mon_client *monc,
940
struct ceph_msg *msg)
941
{
942
struct ceph_mon_generic_request *req;
943
void *p = msg->front.iov_base;
944
void *const end = p + msg->front_alloc_len;
945
u64 tid = le64_to_cpu(msg->hdr.tid);
946
947
dout("%s msg %p tid %llu\n", __func__, msg, tid);
948
949
ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
950
sizeof(u32), bad);
951
p += sizeof(struct ceph_mon_request_header);
952
953
mutex_lock(&monc->mutex);
954
req = lookup_generic_request(&monc->generic_request_tree, tid);
955
if (!req) {
956
mutex_unlock(&monc->mutex);
957
return;
958
}
959
960
req->result = ceph_decode_32(&p);
961
__finish_generic_request(req);
962
mutex_unlock(&monc->mutex);
963
964
complete_generic_request(req);
965
return;
966
967
bad:
968
pr_err("corrupt mon_command ack, tid %llu\n", tid);
969
ceph_msg_dump(msg);
970
}
971
972
static __printf(2, 0)
973
int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt,
974
va_list ap)
975
{
976
struct ceph_mon_generic_request *req;
977
struct ceph_mon_command *h;
978
int ret = -ENOMEM;
979
int len;
980
981
req = alloc_generic_request(monc, GFP_NOIO);
982
if (!req)
983
goto out;
984
985
req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
986
if (!req->request)
987
goto out;
988
989
req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
990
true);
991
if (!req->reply)
992
goto out;
993
994
mutex_lock(&monc->mutex);
995
register_generic_request(req);
996
h = req->request->front.iov_base;
997
h->monhdr.have_version = 0;
998
h->monhdr.session_mon = cpu_to_le16(-1);
999
h->monhdr.session_mon_tid = 0;
1000
h->fsid = monc->monmap->fsid;
1001
h->num_strs = cpu_to_le32(1);
1002
len = vsprintf(h->str, fmt, ap);
1003
h->str_len = cpu_to_le32(len);
1004
send_generic_request(monc, req);
1005
mutex_unlock(&monc->mutex);
1006
1007
ret = wait_generic_request(req);
1008
out:
1009
put_generic_request(req);
1010
return ret;
1011
}
1012
1013
static __printf(2, 3)
1014
int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...)
1015
{
1016
va_list ap;
1017
int ret;
1018
1019
va_start(ap, fmt);
1020
ret = do_mon_command_vargs(monc, fmt, ap);
1021
va_end(ap);
1022
return ret;
1023
}
1024
1025
int ceph_monc_blocklist_add(struct ceph_mon_client *monc,
1026
struct ceph_entity_addr *client_addr)
1027
{
1028
int ret;
1029
1030
ret = do_mon_command(monc,
1031
"{ \"prefix\": \"osd blocklist\", \
1032
\"blocklistop\": \"add\", \
1033
\"addr\": \"%pISpc/%u\" }",
1034
&client_addr->in_addr,
1035
le32_to_cpu(client_addr->nonce));
1036
if (ret == -EINVAL) {
1037
/*
1038
* The monitor returns EINVAL on an unrecognized command.
1039
* Try the legacy command -- it is exactly the same except
1040
* for the name.
1041
*/
1042
ret = do_mon_command(monc,
1043
"{ \"prefix\": \"osd blacklist\", \
1044
\"blacklistop\": \"add\", \
1045
\"addr\": \"%pISpc/%u\" }",
1046
&client_addr->in_addr,
1047
le32_to_cpu(client_addr->nonce));
1048
}
1049
if (ret)
1050
return ret;
1051
1052
/*
1053
* Make sure we have the osdmap that includes the blocklist
1054
* entry. This is needed to ensure that the OSDs pick up the
1055
* new blocklist before processing any future requests from
1056
* this client.
1057
*/
1058
return ceph_wait_for_latest_osdmap(monc->client, 0);
1059
}
1060
EXPORT_SYMBOL(ceph_monc_blocklist_add);
1061
1062
/*
1063
* Resend pending generic requests.
1064
*/
1065
static void __resend_generic_request(struct ceph_mon_client *monc)
1066
{
1067
struct ceph_mon_generic_request *req;
1068
struct rb_node *p;
1069
1070
for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
1071
req = rb_entry(p, struct ceph_mon_generic_request, node);
1072
ceph_msg_revoke(req->request);
1073
ceph_msg_revoke_incoming(req->reply);
1074
ceph_con_send(&monc->con, ceph_msg_get(req->request));
1075
}
1076
}
1077
1078
/*
1079
* Delayed work. If we haven't mounted yet, retry. Otherwise,
1080
* renew/retry subscription as needed (in case it is timing out, or we
1081
* got an ENOMEM). And keep the monitor connection alive.
1082
*/
1083
static void delayed_work(struct work_struct *work)
1084
{
1085
struct ceph_mon_client *monc =
1086
container_of(work, struct ceph_mon_client, delayed_work.work);
1087
1088
mutex_lock(&monc->mutex);
1089
dout("%s mon%d\n", __func__, monc->cur_mon);
1090
if (monc->cur_mon < 0) {
1091
goto out;
1092
}
1093
1094
if (monc->hunting) {
1095
dout("%s continuing hunt\n", __func__);
1096
reopen_session(monc);
1097
} else {
1098
int is_auth = ceph_auth_is_authenticated(monc->auth);
1099
1100
dout("%s is_authed %d\n", __func__, is_auth);
1101
if (ceph_con_keepalive_expired(&monc->con,
1102
CEPH_MONC_PING_TIMEOUT)) {
1103
dout("monc keepalive timeout\n");
1104
is_auth = 0;
1105
reopen_session(monc);
1106
}
1107
1108
if (!monc->hunting) {
1109
ceph_con_keepalive(&monc->con);
1110
__validate_auth(monc);
1111
un_backoff(monc);
1112
}
1113
1114
if (is_auth &&
1115
!(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
1116
unsigned long now = jiffies;
1117
1118
dout("%s renew subs? now %lu renew after %lu\n",
1119
__func__, now, monc->sub_renew_after);
1120
if (time_after_eq(now, monc->sub_renew_after))
1121
__send_subscribe(monc);
1122
}
1123
}
1124
__schedule_delayed(monc);
1125
1126
out:
1127
mutex_unlock(&monc->mutex);
1128
}
1129
1130
/*
1131
* On startup, we build a temporary monmap populated with the IPs
1132
* provided by mount(2).
1133
*/
1134
static int build_initial_monmap(struct ceph_mon_client *monc)
1135
{
1136
__le32 my_type = ceph_msgr2(monc->client) ?
1137
CEPH_ENTITY_ADDR_TYPE_MSGR2 : CEPH_ENTITY_ADDR_TYPE_LEGACY;
1138
struct ceph_options *opt = monc->client->options;
1139
int num_mon = opt->num_mon;
1140
int i;
1141
1142
/* build initial monmap */
1143
monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon),
1144
GFP_KERNEL);
1145
if (!monc->monmap)
1146
return -ENOMEM;
1147
monc->monmap->num_mon = num_mon;
1148
1149
for (i = 0; i < num_mon; i++) {
1150
struct ceph_entity_inst *inst = &monc->monmap->mon_inst[i];
1151
1152
memcpy(&inst->addr.in_addr, &opt->mon_addr[i].in_addr,
1153
sizeof(inst->addr.in_addr));
1154
inst->addr.type = my_type;
1155
inst->addr.nonce = 0;
1156
inst->name.type = CEPH_ENTITY_TYPE_MON;
1157
inst->name.num = cpu_to_le64(i);
1158
}
1159
return 0;
1160
}
1161
1162
int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
1163
{
1164
int err;
1165
1166
dout("init\n");
1167
memset(monc, 0, sizeof(*monc));
1168
monc->client = cl;
1169
mutex_init(&monc->mutex);
1170
1171
err = build_initial_monmap(monc);
1172
if (err)
1173
goto out;
1174
1175
/* connection */
1176
/* authentication */
1177
monc->auth = ceph_auth_init(cl->options->name, cl->options->key,
1178
cl->options->con_modes);
1179
if (IS_ERR(monc->auth)) {
1180
err = PTR_ERR(monc->auth);
1181
goto out_monmap;
1182
}
1183
monc->auth->want_keys =
1184
CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
1185
CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
1186
1187
/* msgs */
1188
err = -ENOMEM;
1189
monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
1190
sizeof(struct ceph_mon_subscribe_ack),
1191
GFP_KERNEL, true);
1192
if (!monc->m_subscribe_ack)
1193
goto out_auth;
1194
1195
monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
1196
GFP_KERNEL, true);
1197
if (!monc->m_subscribe)
1198
goto out_subscribe_ack;
1199
1200
monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
1201
GFP_KERNEL, true);
1202
if (!monc->m_auth_reply)
1203
goto out_subscribe;
1204
1205
monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
1206
monc->pending_auth = 0;
1207
if (!monc->m_auth)
1208
goto out_auth_reply;
1209
1210
ceph_con_init(&monc->con, monc, &mon_con_ops,
1211
&monc->client->msgr);
1212
1213
monc->cur_mon = -1;
1214
monc->had_a_connection = false;
1215
monc->hunt_mult = 1;
1216
1217
INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
1218
monc->generic_request_tree = RB_ROOT;
1219
monc->last_tid = 0;
1220
1221
monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
1222
1223
return 0;
1224
1225
out_auth_reply:
1226
ceph_msg_put(monc->m_auth_reply);
1227
out_subscribe:
1228
ceph_msg_put(monc->m_subscribe);
1229
out_subscribe_ack:
1230
ceph_msg_put(monc->m_subscribe_ack);
1231
out_auth:
1232
ceph_auth_destroy(monc->auth);
1233
out_monmap:
1234
kfree(monc->monmap);
1235
out:
1236
return err;
1237
}
1238
EXPORT_SYMBOL(ceph_monc_init);
1239
1240
void ceph_monc_stop(struct ceph_mon_client *monc)
1241
{
1242
dout("stop\n");
1243
1244
mutex_lock(&monc->mutex);
1245
__close_session(monc);
1246
monc->hunting = false;
1247
monc->cur_mon = -1;
1248
mutex_unlock(&monc->mutex);
1249
1250
cancel_delayed_work_sync(&monc->delayed_work);
1251
1252
/*
1253
* flush msgr queue before we destroy ourselves to ensure that:
1254
* - any work that references our embedded con is finished.
1255
* - any osd_client or other work that may reference an authorizer
1256
* finishes before we shut down the auth subsystem.
1257
*/
1258
ceph_msgr_flush();
1259
1260
ceph_auth_destroy(monc->auth);
1261
1262
WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
1263
1264
ceph_msg_put(monc->m_auth);
1265
ceph_msg_put(monc->m_auth_reply);
1266
ceph_msg_put(monc->m_subscribe);
1267
ceph_msg_put(monc->m_subscribe_ack);
1268
1269
kfree(monc->monmap);
1270
}
1271
EXPORT_SYMBOL(ceph_monc_stop);
1272
1273
static void finish_hunting(struct ceph_mon_client *monc)
1274
{
1275
if (monc->hunting) {
1276
dout("%s found mon%d\n", __func__, monc->cur_mon);
1277
monc->hunting = false;
1278
monc->had_a_connection = true;
1279
un_backoff(monc);
1280
__schedule_delayed(monc);
1281
}
1282
}
1283
1284
static void finish_auth(struct ceph_mon_client *monc, int auth_err,
1285
bool was_authed)
1286
{
1287
dout("%s auth_err %d was_authed %d\n", __func__, auth_err, was_authed);
1288
WARN_ON(auth_err > 0);
1289
1290
monc->pending_auth = 0;
1291
if (auth_err) {
1292
monc->client->auth_err = auth_err;
1293
wake_up_all(&monc->client->auth_wq);
1294
return;
1295
}
1296
1297
if (!was_authed && ceph_auth_is_authenticated(monc->auth)) {
1298
dout("%s authenticated, starting session global_id %llu\n",
1299
__func__, monc->auth->global_id);
1300
1301
monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
1302
monc->client->msgr.inst.name.num =
1303
cpu_to_le64(monc->auth->global_id);
1304
1305
__send_subscribe(monc);
1306
__resend_generic_request(monc);
1307
1308
pr_info("mon%d %s session established\n", monc->cur_mon,
1309
ceph_pr_addr(&monc->con.peer_addr));
1310
}
1311
}
1312
1313
static void handle_auth_reply(struct ceph_mon_client *monc,
1314
struct ceph_msg *msg)
1315
{
1316
bool was_authed;
1317
int ret;
1318
1319
mutex_lock(&monc->mutex);
1320
was_authed = ceph_auth_is_authenticated(monc->auth);
1321
ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
1322
msg->front.iov_len,
1323
monc->m_auth->front.iov_base,
1324
monc->m_auth->front_alloc_len);
1325
if (ret > 0) {
1326
__send_prepared_auth_request(monc, ret);
1327
} else {
1328
finish_auth(monc, ret, was_authed);
1329
finish_hunting(monc);
1330
}
1331
mutex_unlock(&monc->mutex);
1332
}
1333
1334
static int __validate_auth(struct ceph_mon_client *monc)
1335
{
1336
int ret;
1337
1338
if (monc->pending_auth)
1339
return 0;
1340
1341
ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
1342
monc->m_auth->front_alloc_len);
1343
if (ret <= 0)
1344
return ret; /* either an error, or no need to authenticate */
1345
__send_prepared_auth_request(monc, ret);
1346
return 0;
1347
}
1348
1349
int ceph_monc_validate_auth(struct ceph_mon_client *monc)
1350
{
1351
int ret;
1352
1353
mutex_lock(&monc->mutex);
1354
ret = __validate_auth(monc);
1355
mutex_unlock(&monc->mutex);
1356
return ret;
1357
}
1358
EXPORT_SYMBOL(ceph_monc_validate_auth);
1359
1360
static int mon_get_auth_request(struct ceph_connection *con,
1361
void *buf, int *buf_len,
1362
void **authorizer, int *authorizer_len)
1363
{
1364
struct ceph_mon_client *monc = con->private;
1365
int ret;
1366
1367
mutex_lock(&monc->mutex);
1368
ret = ceph_auth_get_request(monc->auth, buf, *buf_len);
1369
mutex_unlock(&monc->mutex);
1370
if (ret < 0)
1371
return ret;
1372
1373
*buf_len = ret;
1374
*authorizer = NULL;
1375
*authorizer_len = 0;
1376
return 0;
1377
}
1378
1379
static int mon_handle_auth_reply_more(struct ceph_connection *con,
1380
void *reply, int reply_len,
1381
void *buf, int *buf_len,
1382
void **authorizer, int *authorizer_len)
1383
{
1384
struct ceph_mon_client *monc = con->private;
1385
int ret;
1386
1387
mutex_lock(&monc->mutex);
1388
ret = ceph_auth_handle_reply_more(monc->auth, reply, reply_len,
1389
buf, *buf_len);
1390
mutex_unlock(&monc->mutex);
1391
if (ret < 0)
1392
return ret;
1393
1394
*buf_len = ret;
1395
*authorizer = NULL;
1396
*authorizer_len = 0;
1397
return 0;
1398
}
1399
1400
static int mon_handle_auth_done(struct ceph_connection *con,
1401
u64 global_id, void *reply, int reply_len,
1402
u8 *session_key, int *session_key_len,
1403
u8 *con_secret, int *con_secret_len)
1404
{
1405
struct ceph_mon_client *monc = con->private;
1406
bool was_authed;
1407
int ret;
1408
1409
mutex_lock(&monc->mutex);
1410
WARN_ON(!monc->hunting);
1411
was_authed = ceph_auth_is_authenticated(monc->auth);
1412
ret = ceph_auth_handle_reply_done(monc->auth, global_id,
1413
reply, reply_len,
1414
session_key, session_key_len,
1415
con_secret, con_secret_len);
1416
finish_auth(monc, ret, was_authed);
1417
if (!ret)
1418
finish_hunting(monc);
1419
mutex_unlock(&monc->mutex);
1420
return 0;
1421
}
1422
1423
static int mon_handle_auth_bad_method(struct ceph_connection *con,
1424
int used_proto, int result,
1425
const int *allowed_protos, int proto_cnt,
1426
const int *allowed_modes, int mode_cnt)
1427
{
1428
struct ceph_mon_client *monc = con->private;
1429
bool was_authed;
1430
1431
mutex_lock(&monc->mutex);
1432
WARN_ON(!monc->hunting);
1433
was_authed = ceph_auth_is_authenticated(monc->auth);
1434
ceph_auth_handle_bad_method(monc->auth, used_proto, result,
1435
allowed_protos, proto_cnt,
1436
allowed_modes, mode_cnt);
1437
finish_auth(monc, -EACCES, was_authed);
1438
mutex_unlock(&monc->mutex);
1439
return 0;
1440
}
1441
1442
/*
1443
* handle incoming message
1444
*/
1445
static void mon_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1446
{
1447
struct ceph_mon_client *monc = con->private;
1448
int type = le16_to_cpu(msg->hdr.type);
1449
1450
switch (type) {
1451
case CEPH_MSG_AUTH_REPLY:
1452
handle_auth_reply(monc, msg);
1453
break;
1454
1455
case CEPH_MSG_MON_SUBSCRIBE_ACK:
1456
handle_subscribe_ack(monc, msg);
1457
break;
1458
1459
case CEPH_MSG_STATFS_REPLY:
1460
handle_statfs_reply(monc, msg);
1461
break;
1462
1463
case CEPH_MSG_MON_GET_VERSION_REPLY:
1464
handle_get_version_reply(monc, msg);
1465
break;
1466
1467
case CEPH_MSG_MON_COMMAND_ACK:
1468
handle_command_ack(monc, msg);
1469
break;
1470
1471
case CEPH_MSG_MON_MAP:
1472
ceph_monc_handle_map(monc, msg);
1473
break;
1474
1475
case CEPH_MSG_OSD_MAP:
1476
ceph_osdc_handle_map(&monc->client->osdc, msg);
1477
break;
1478
1479
default:
1480
/* can the chained handler handle it? */
1481
if (monc->client->extra_mon_dispatch &&
1482
monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1483
break;
1484
1485
pr_err("received unknown message type %d %s\n", type,
1486
ceph_msg_type_name(type));
1487
}
1488
ceph_msg_put(msg);
1489
}
1490
1491
/*
1492
* Allocate memory for incoming message
1493
*/
1494
static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
1495
struct ceph_msg_header *hdr,
1496
int *skip)
1497
{
1498
struct ceph_mon_client *monc = con->private;
1499
int type = le16_to_cpu(hdr->type);
1500
int front_len = le32_to_cpu(hdr->front_len);
1501
struct ceph_msg *m = NULL;
1502
1503
*skip = 0;
1504
1505
switch (type) {
1506
case CEPH_MSG_MON_SUBSCRIBE_ACK:
1507
m = ceph_msg_get(monc->m_subscribe_ack);
1508
break;
1509
case CEPH_MSG_STATFS_REPLY:
1510
case CEPH_MSG_MON_COMMAND_ACK:
1511
return get_generic_reply(con, hdr, skip);
1512
case CEPH_MSG_AUTH_REPLY:
1513
m = ceph_msg_get(monc->m_auth_reply);
1514
break;
1515
case CEPH_MSG_MON_GET_VERSION_REPLY:
1516
if (le64_to_cpu(hdr->tid) != 0)
1517
return get_generic_reply(con, hdr, skip);
1518
1519
/*
1520
* Older OSDs don't set reply tid even if the original
1521
* request had a non-zero tid. Work around this weirdness
1522
* by allocating a new message.
1523
*/
1524
fallthrough;
1525
case CEPH_MSG_MON_MAP:
1526
case CEPH_MSG_MDS_MAP:
1527
case CEPH_MSG_OSD_MAP:
1528
case CEPH_MSG_FS_MAP_USER:
1529
m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1530
if (!m)
1531
return NULL; /* ENOMEM--return skip == 0 */
1532
break;
1533
}
1534
1535
if (!m) {
1536
pr_info("alloc_msg unknown type %d\n", type);
1537
*skip = 1;
1538
} else if (front_len > m->front_alloc_len) {
1539
pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
1540
front_len, m->front_alloc_len,
1541
(unsigned int)con->peer_name.type,
1542
le64_to_cpu(con->peer_name.num));
1543
ceph_msg_put(m);
1544
m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1545
}
1546
1547
return m;
1548
}
1549
1550
/*
1551
* If the monitor connection resets, pick a new monitor and resubmit
1552
* any pending requests.
1553
*/
1554
static void mon_fault(struct ceph_connection *con)
1555
{
1556
struct ceph_mon_client *monc = con->private;
1557
1558
mutex_lock(&monc->mutex);
1559
dout("%s mon%d\n", __func__, monc->cur_mon);
1560
if (monc->cur_mon >= 0) {
1561
if (!monc->hunting) {
1562
dout("%s hunting for new mon\n", __func__);
1563
reopen_session(monc);
1564
__schedule_delayed(monc);
1565
} else {
1566
dout("%s already hunting\n", __func__);
1567
}
1568
}
1569
mutex_unlock(&monc->mutex);
1570
}
1571
1572
/*
1573
* We can ignore refcounting on the connection struct, as all references
1574
* will come from the messenger workqueue, which is drained prior to
1575
* mon_client destruction.
1576
*/
1577
static struct ceph_connection *mon_get_con(struct ceph_connection *con)
1578
{
1579
return con;
1580
}
1581
1582
static void mon_put_con(struct ceph_connection *con)
1583
{
1584
}
1585
1586
static const struct ceph_connection_operations mon_con_ops = {
1587
.get = mon_get_con,
1588
.put = mon_put_con,
1589
.alloc_msg = mon_alloc_msg,
1590
.dispatch = mon_dispatch,
1591
.fault = mon_fault,
1592
.get_auth_request = mon_get_auth_request,
1593
.handle_auth_reply_more = mon_handle_auth_reply_more,
1594
.handle_auth_done = mon_handle_auth_done,
1595
.handle_auth_bad_method = mon_handle_auth_bad_method,
1596
};
1597
1598