Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/net/ceph/messenger_v1.c
26282 views
1
// SPDX-License-Identifier: GPL-2.0
2
#include <linux/ceph/ceph_debug.h>
3
4
#include <linux/bvec.h>
5
#include <linux/crc32c.h>
6
#include <linux/net.h>
7
#include <linux/socket.h>
8
#include <net/sock.h>
9
10
#include <linux/ceph/ceph_features.h>
11
#include <linux/ceph/decode.h>
12
#include <linux/ceph/libceph.h>
13
#include <linux/ceph/messenger.h>
14
15
/* static tag bytes (protocol control messages) */
16
static char tag_msg = CEPH_MSGR_TAG_MSG;
17
static char tag_ack = CEPH_MSGR_TAG_ACK;
18
static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
19
static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
20
21
/*
22
* If @buf is NULL, discard up to @len bytes.
23
*/
24
static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
25
{
26
struct kvec iov = {buf, len};
27
struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
28
int r;
29
30
if (!buf)
31
msg.msg_flags |= MSG_TRUNC;
32
33
iov_iter_kvec(&msg.msg_iter, ITER_DEST, &iov, 1, len);
34
r = sock_recvmsg(sock, &msg, msg.msg_flags);
35
if (r == -EAGAIN)
36
r = 0;
37
return r;
38
}
39
40
static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
41
int page_offset, size_t length)
42
{
43
struct bio_vec bvec;
44
struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
45
int r;
46
47
BUG_ON(page_offset + length > PAGE_SIZE);
48
bvec_set_page(&bvec, page, length, page_offset);
49
iov_iter_bvec(&msg.msg_iter, ITER_DEST, &bvec, 1, length);
50
r = sock_recvmsg(sock, &msg, msg.msg_flags);
51
if (r == -EAGAIN)
52
r = 0;
53
return r;
54
}
55
56
/*
57
* write something. @more is true if caller will be sending more data
58
* shortly.
59
*/
60
static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
61
size_t kvlen, size_t len, bool more)
62
{
63
struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
64
int r;
65
66
if (more)
67
msg.msg_flags |= MSG_MORE;
68
else
69
msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */
70
71
r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
72
if (r == -EAGAIN)
73
r = 0;
74
return r;
75
}
76
77
/*
78
* @more: MSG_MORE or 0.
79
*/
80
static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
81
int offset, size_t size, int more)
82
{
83
struct msghdr msg = {
84
.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | more,
85
};
86
struct bio_vec bvec;
87
int ret;
88
89
/*
90
* MSG_SPLICE_PAGES cannot properly handle pages with page_count == 0,
91
* we need to fall back to sendmsg if that's the case.
92
*
93
* Same goes for slab pages: skb_can_coalesce() allows
94
* coalescing neighboring slab objects into a single frag which
95
* triggers one of hardened usercopy checks.
96
*/
97
if (sendpage_ok(page))
98
msg.msg_flags |= MSG_SPLICE_PAGES;
99
100
bvec_set_page(&bvec, page, size, offset);
101
iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, size);
102
103
ret = sock_sendmsg(sock, &msg);
104
if (ret == -EAGAIN)
105
ret = 0;
106
107
return ret;
108
}
109
110
static void con_out_kvec_reset(struct ceph_connection *con)
111
{
112
BUG_ON(con->v1.out_skip);
113
114
con->v1.out_kvec_left = 0;
115
con->v1.out_kvec_bytes = 0;
116
con->v1.out_kvec_cur = &con->v1.out_kvec[0];
117
}
118
119
static void con_out_kvec_add(struct ceph_connection *con,
120
size_t size, void *data)
121
{
122
int index = con->v1.out_kvec_left;
123
124
BUG_ON(con->v1.out_skip);
125
BUG_ON(index >= ARRAY_SIZE(con->v1.out_kvec));
126
127
con->v1.out_kvec[index].iov_len = size;
128
con->v1.out_kvec[index].iov_base = data;
129
con->v1.out_kvec_left++;
130
con->v1.out_kvec_bytes += size;
131
}
132
133
/*
134
* Chop off a kvec from the end. Return residual number of bytes for
135
* that kvec, i.e. how many bytes would have been written if the kvec
136
* hadn't been nuked.
137
*/
138
static int con_out_kvec_skip(struct ceph_connection *con)
139
{
140
int skip = 0;
141
142
if (con->v1.out_kvec_bytes > 0) {
143
skip = con->v1.out_kvec_cur[con->v1.out_kvec_left - 1].iov_len;
144
BUG_ON(con->v1.out_kvec_bytes < skip);
145
BUG_ON(!con->v1.out_kvec_left);
146
con->v1.out_kvec_bytes -= skip;
147
con->v1.out_kvec_left--;
148
}
149
150
return skip;
151
}
152
153
static size_t sizeof_footer(struct ceph_connection *con)
154
{
155
return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
156
sizeof(struct ceph_msg_footer) :
157
sizeof(struct ceph_msg_footer_old);
158
}
159
160
static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
161
{
162
/* Initialize data cursor if it's not a sparse read */
163
u64 len = msg->sparse_read_total ? : data_len;
164
165
ceph_msg_data_cursor_init(&msg->cursor, msg, len);
166
}
167
168
/*
169
* Prepare footer for currently outgoing message, and finish things
170
* off. Assumes out_kvec* are already valid.. we just add on to the end.
171
*/
172
static void prepare_write_message_footer(struct ceph_connection *con)
173
{
174
struct ceph_msg *m = con->out_msg;
175
176
m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
177
178
dout("prepare_write_message_footer %p\n", con);
179
con_out_kvec_add(con, sizeof_footer(con), &m->footer);
180
if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
181
if (con->ops->sign_message)
182
con->ops->sign_message(m);
183
else
184
m->footer.sig = 0;
185
} else {
186
m->old_footer.flags = m->footer.flags;
187
}
188
con->v1.out_more = m->more_to_follow;
189
con->v1.out_msg_done = true;
190
}
191
192
/*
193
* Prepare headers for the next outgoing message.
194
*/
195
static void prepare_write_message(struct ceph_connection *con)
196
{
197
struct ceph_msg *m;
198
u32 crc;
199
200
con_out_kvec_reset(con);
201
con->v1.out_msg_done = false;
202
203
/* Sneak an ack in there first? If we can get it into the same
204
* TCP packet that's a good thing. */
205
if (con->in_seq > con->in_seq_acked) {
206
con->in_seq_acked = con->in_seq;
207
con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
208
con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
209
con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
210
&con->v1.out_temp_ack);
211
}
212
213
ceph_con_get_out_msg(con);
214
m = con->out_msg;
215
216
dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
217
m, con->out_seq, le16_to_cpu(m->hdr.type),
218
le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
219
m->data_length);
220
WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
221
WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
222
223
/* tag + hdr + front + middle */
224
con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
225
con_out_kvec_add(con, sizeof(con->v1.out_hdr), &con->v1.out_hdr);
226
con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
227
228
if (m->middle)
229
con_out_kvec_add(con, m->middle->vec.iov_len,
230
m->middle->vec.iov_base);
231
232
/* fill in hdr crc and finalize hdr */
233
crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
234
con->out_msg->hdr.crc = cpu_to_le32(crc);
235
memcpy(&con->v1.out_hdr, &con->out_msg->hdr, sizeof(con->v1.out_hdr));
236
237
/* fill in front and middle crc, footer */
238
crc = crc32c(0, m->front.iov_base, m->front.iov_len);
239
con->out_msg->footer.front_crc = cpu_to_le32(crc);
240
if (m->middle) {
241
crc = crc32c(0, m->middle->vec.iov_base,
242
m->middle->vec.iov_len);
243
con->out_msg->footer.middle_crc = cpu_to_le32(crc);
244
} else
245
con->out_msg->footer.middle_crc = 0;
246
dout("%s front_crc %u middle_crc %u\n", __func__,
247
le32_to_cpu(con->out_msg->footer.front_crc),
248
le32_to_cpu(con->out_msg->footer.middle_crc));
249
con->out_msg->footer.flags = 0;
250
251
/* is there a data payload? */
252
con->out_msg->footer.data_crc = 0;
253
if (m->data_length) {
254
prepare_message_data(con->out_msg, m->data_length);
255
con->v1.out_more = 1; /* data + footer will follow */
256
} else {
257
/* no, queue up footer too and be done */
258
prepare_write_message_footer(con);
259
}
260
261
ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
262
}
263
264
/*
265
* Prepare an ack.
266
*/
267
static void prepare_write_ack(struct ceph_connection *con)
268
{
269
dout("prepare_write_ack %p %llu -> %llu\n", con,
270
con->in_seq_acked, con->in_seq);
271
con->in_seq_acked = con->in_seq;
272
273
con_out_kvec_reset(con);
274
275
con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
276
277
con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
278
con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
279
&con->v1.out_temp_ack);
280
281
con->v1.out_more = 1; /* more will follow.. eventually.. */
282
ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
283
}
284
285
/*
286
* Prepare to share the seq during handshake
287
*/
288
static void prepare_write_seq(struct ceph_connection *con)
289
{
290
dout("prepare_write_seq %p %llu -> %llu\n", con,
291
con->in_seq_acked, con->in_seq);
292
con->in_seq_acked = con->in_seq;
293
294
con_out_kvec_reset(con);
295
296
con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
297
con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
298
&con->v1.out_temp_ack);
299
300
ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
301
}
302
303
/*
304
* Prepare to write keepalive byte.
305
*/
306
static void prepare_write_keepalive(struct ceph_connection *con)
307
{
308
dout("prepare_write_keepalive %p\n", con);
309
con_out_kvec_reset(con);
310
if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
311
struct timespec64 now;
312
313
ktime_get_real_ts64(&now);
314
con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
315
ceph_encode_timespec64(&con->v1.out_temp_keepalive2, &now);
316
con_out_kvec_add(con, sizeof(con->v1.out_temp_keepalive2),
317
&con->v1.out_temp_keepalive2);
318
} else {
319
con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
320
}
321
ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
322
}
323
324
/*
325
* Connection negotiation.
326
*/
327
328
static int get_connect_authorizer(struct ceph_connection *con)
329
{
330
struct ceph_auth_handshake *auth;
331
int auth_proto;
332
333
if (!con->ops->get_authorizer) {
334
con->v1.auth = NULL;
335
con->v1.out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
336
con->v1.out_connect.authorizer_len = 0;
337
return 0;
338
}
339
340
auth = con->ops->get_authorizer(con, &auth_proto, con->v1.auth_retry);
341
if (IS_ERR(auth))
342
return PTR_ERR(auth);
343
344
con->v1.auth = auth;
345
con->v1.out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
346
con->v1.out_connect.authorizer_len =
347
cpu_to_le32(auth->authorizer_buf_len);
348
return 0;
349
}
350
351
/*
352
* We connected to a peer and are saying hello.
353
*/
354
static void prepare_write_banner(struct ceph_connection *con)
355
{
356
con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
357
con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
358
&con->msgr->my_enc_addr);
359
360
con->v1.out_more = 0;
361
ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
362
}
363
364
static void __prepare_write_connect(struct ceph_connection *con)
365
{
366
con_out_kvec_add(con, sizeof(con->v1.out_connect),
367
&con->v1.out_connect);
368
if (con->v1.auth)
369
con_out_kvec_add(con, con->v1.auth->authorizer_buf_len,
370
con->v1.auth->authorizer_buf);
371
372
con->v1.out_more = 0;
373
ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
374
}
375
376
static int prepare_write_connect(struct ceph_connection *con)
377
{
378
unsigned int global_seq = ceph_get_global_seq(con->msgr, 0);
379
int proto;
380
int ret;
381
382
switch (con->peer_name.type) {
383
case CEPH_ENTITY_TYPE_MON:
384
proto = CEPH_MONC_PROTOCOL;
385
break;
386
case CEPH_ENTITY_TYPE_OSD:
387
proto = CEPH_OSDC_PROTOCOL;
388
break;
389
case CEPH_ENTITY_TYPE_MDS:
390
proto = CEPH_MDSC_PROTOCOL;
391
break;
392
default:
393
BUG();
394
}
395
396
dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
397
con->v1.connect_seq, global_seq, proto);
398
399
con->v1.out_connect.features =
400
cpu_to_le64(from_msgr(con->msgr)->supported_features);
401
con->v1.out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
402
con->v1.out_connect.connect_seq = cpu_to_le32(con->v1.connect_seq);
403
con->v1.out_connect.global_seq = cpu_to_le32(global_seq);
404
con->v1.out_connect.protocol_version = cpu_to_le32(proto);
405
con->v1.out_connect.flags = 0;
406
407
ret = get_connect_authorizer(con);
408
if (ret)
409
return ret;
410
411
__prepare_write_connect(con);
412
return 0;
413
}
414
415
/*
416
* write as much of pending kvecs to the socket as we can.
417
* 1 -> done
418
* 0 -> socket full, but more to do
419
* <0 -> error
420
*/
421
static int write_partial_kvec(struct ceph_connection *con)
422
{
423
int ret;
424
425
dout("write_partial_kvec %p %d left\n", con, con->v1.out_kvec_bytes);
426
while (con->v1.out_kvec_bytes > 0) {
427
ret = ceph_tcp_sendmsg(con->sock, con->v1.out_kvec_cur,
428
con->v1.out_kvec_left,
429
con->v1.out_kvec_bytes,
430
con->v1.out_more);
431
if (ret <= 0)
432
goto out;
433
con->v1.out_kvec_bytes -= ret;
434
if (!con->v1.out_kvec_bytes)
435
break; /* done */
436
437
/* account for full iov entries consumed */
438
while (ret >= con->v1.out_kvec_cur->iov_len) {
439
BUG_ON(!con->v1.out_kvec_left);
440
ret -= con->v1.out_kvec_cur->iov_len;
441
con->v1.out_kvec_cur++;
442
con->v1.out_kvec_left--;
443
}
444
/* and for a partially-consumed entry */
445
if (ret) {
446
con->v1.out_kvec_cur->iov_len -= ret;
447
con->v1.out_kvec_cur->iov_base += ret;
448
}
449
}
450
con->v1.out_kvec_left = 0;
451
ret = 1;
452
out:
453
dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
454
con->v1.out_kvec_bytes, con->v1.out_kvec_left, ret);
455
return ret; /* done! */
456
}
457
458
/*
459
* Write as much message data payload as we can. If we finish, queue
460
* up the footer.
461
* 1 -> done, footer is now queued in out_kvec[].
462
* 0 -> socket full, but more to do
463
* <0 -> error
464
*/
465
static int write_partial_message_data(struct ceph_connection *con)
466
{
467
struct ceph_msg *msg = con->out_msg;
468
struct ceph_msg_data_cursor *cursor = &msg->cursor;
469
bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
470
u32 crc;
471
472
dout("%s %p msg %p\n", __func__, con, msg);
473
474
if (!msg->num_data_items)
475
return -EINVAL;
476
477
/*
478
* Iterate through each page that contains data to be
479
* written, and send as much as possible for each.
480
*
481
* If we are calculating the data crc (the default), we will
482
* need to map the page. If we have no pages, they have
483
* been revoked, so use the zero page.
484
*/
485
crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
486
while (cursor->total_resid) {
487
struct page *page;
488
size_t page_offset;
489
size_t length;
490
int ret;
491
492
if (!cursor->resid) {
493
ceph_msg_data_advance(cursor, 0);
494
continue;
495
}
496
497
page = ceph_msg_data_next(cursor, &page_offset, &length);
498
ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
499
MSG_MORE);
500
if (ret <= 0) {
501
if (do_datacrc)
502
msg->footer.data_crc = cpu_to_le32(crc);
503
504
return ret;
505
}
506
if (do_datacrc && cursor->need_crc)
507
crc = ceph_crc32c_page(crc, page, page_offset, length);
508
ceph_msg_data_advance(cursor, (size_t)ret);
509
}
510
511
dout("%s %p msg %p done\n", __func__, con, msg);
512
513
/* prepare and queue up footer, too */
514
if (do_datacrc)
515
msg->footer.data_crc = cpu_to_le32(crc);
516
else
517
msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
518
con_out_kvec_reset(con);
519
prepare_write_message_footer(con);
520
521
return 1; /* must return > 0 to indicate success */
522
}
523
524
/*
525
* write some zeros
526
*/
527
static int write_partial_skip(struct ceph_connection *con)
528
{
529
int ret;
530
531
dout("%s %p %d left\n", __func__, con, con->v1.out_skip);
532
while (con->v1.out_skip > 0) {
533
size_t size = min(con->v1.out_skip, (int)PAGE_SIZE);
534
535
ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size,
536
MSG_MORE);
537
if (ret <= 0)
538
goto out;
539
con->v1.out_skip -= ret;
540
}
541
ret = 1;
542
out:
543
return ret;
544
}
545
546
/*
547
* Prepare to read connection handshake, or an ack.
548
*/
549
static void prepare_read_banner(struct ceph_connection *con)
550
{
551
dout("prepare_read_banner %p\n", con);
552
con->v1.in_base_pos = 0;
553
}
554
555
static void prepare_read_connect(struct ceph_connection *con)
556
{
557
dout("prepare_read_connect %p\n", con);
558
con->v1.in_base_pos = 0;
559
}
560
561
static void prepare_read_ack(struct ceph_connection *con)
562
{
563
dout("prepare_read_ack %p\n", con);
564
con->v1.in_base_pos = 0;
565
}
566
567
static void prepare_read_seq(struct ceph_connection *con)
568
{
569
dout("prepare_read_seq %p\n", con);
570
con->v1.in_base_pos = 0;
571
con->v1.in_tag = CEPH_MSGR_TAG_SEQ;
572
}
573
574
static void prepare_read_tag(struct ceph_connection *con)
575
{
576
dout("prepare_read_tag %p\n", con);
577
con->v1.in_base_pos = 0;
578
con->v1.in_tag = CEPH_MSGR_TAG_READY;
579
}
580
581
static void prepare_read_keepalive_ack(struct ceph_connection *con)
582
{
583
dout("prepare_read_keepalive_ack %p\n", con);
584
con->v1.in_base_pos = 0;
585
}
586
587
/*
588
* Prepare to read a message.
589
*/
590
static int prepare_read_message(struct ceph_connection *con)
591
{
592
dout("prepare_read_message %p\n", con);
593
BUG_ON(con->in_msg != NULL);
594
con->v1.in_base_pos = 0;
595
con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
596
return 0;
597
}
598
599
static int read_partial(struct ceph_connection *con,
600
int end, int size, void *object)
601
{
602
while (con->v1.in_base_pos < end) {
603
int left = end - con->v1.in_base_pos;
604
int have = size - left;
605
int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
606
if (ret <= 0)
607
return ret;
608
con->v1.in_base_pos += ret;
609
}
610
return 1;
611
}
612
613
/*
614
* Read all or part of the connect-side handshake on a new connection
615
*/
616
static int read_partial_banner(struct ceph_connection *con)
617
{
618
int size;
619
int end;
620
int ret;
621
622
dout("read_partial_banner %p at %d\n", con, con->v1.in_base_pos);
623
624
/* peer's banner */
625
size = strlen(CEPH_BANNER);
626
end = size;
627
ret = read_partial(con, end, size, con->v1.in_banner);
628
if (ret <= 0)
629
goto out;
630
631
size = sizeof(con->v1.actual_peer_addr);
632
end += size;
633
ret = read_partial(con, end, size, &con->v1.actual_peer_addr);
634
if (ret <= 0)
635
goto out;
636
ceph_decode_banner_addr(&con->v1.actual_peer_addr);
637
638
size = sizeof(con->v1.peer_addr_for_me);
639
end += size;
640
ret = read_partial(con, end, size, &con->v1.peer_addr_for_me);
641
if (ret <= 0)
642
goto out;
643
ceph_decode_banner_addr(&con->v1.peer_addr_for_me);
644
645
out:
646
return ret;
647
}
648
649
static int read_partial_connect(struct ceph_connection *con)
650
{
651
int size;
652
int end;
653
int ret;
654
655
dout("read_partial_connect %p at %d\n", con, con->v1.in_base_pos);
656
657
size = sizeof(con->v1.in_reply);
658
end = size;
659
ret = read_partial(con, end, size, &con->v1.in_reply);
660
if (ret <= 0)
661
goto out;
662
663
if (con->v1.auth) {
664
size = le32_to_cpu(con->v1.in_reply.authorizer_len);
665
if (size > con->v1.auth->authorizer_reply_buf_len) {
666
pr_err("authorizer reply too big: %d > %zu\n", size,
667
con->v1.auth->authorizer_reply_buf_len);
668
ret = -EINVAL;
669
goto out;
670
}
671
672
end += size;
673
ret = read_partial(con, end, size,
674
con->v1.auth->authorizer_reply_buf);
675
if (ret <= 0)
676
goto out;
677
}
678
679
dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
680
con, con->v1.in_reply.tag,
681
le32_to_cpu(con->v1.in_reply.connect_seq),
682
le32_to_cpu(con->v1.in_reply.global_seq));
683
out:
684
return ret;
685
}
686
687
/*
688
* Verify the hello banner looks okay.
689
*/
690
static int verify_hello(struct ceph_connection *con)
691
{
692
if (memcmp(con->v1.in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
693
pr_err("connect to %s got bad banner\n",
694
ceph_pr_addr(&con->peer_addr));
695
con->error_msg = "protocol error, bad banner";
696
return -1;
697
}
698
return 0;
699
}
700
701
static int process_banner(struct ceph_connection *con)
702
{
703
struct ceph_entity_addr *my_addr = &con->msgr->inst.addr;
704
705
dout("process_banner on %p\n", con);
706
707
if (verify_hello(con) < 0)
708
return -1;
709
710
/*
711
* Make sure the other end is who we wanted. note that the other
712
* end may not yet know their ip address, so if it's 0.0.0.0, give
713
* them the benefit of the doubt.
714
*/
715
if (memcmp(&con->peer_addr, &con->v1.actual_peer_addr,
716
sizeof(con->peer_addr)) != 0 &&
717
!(ceph_addr_is_blank(&con->v1.actual_peer_addr) &&
718
con->v1.actual_peer_addr.nonce == con->peer_addr.nonce)) {
719
pr_warn("wrong peer, want %s/%u, got %s/%u\n",
720
ceph_pr_addr(&con->peer_addr),
721
le32_to_cpu(con->peer_addr.nonce),
722
ceph_pr_addr(&con->v1.actual_peer_addr),
723
le32_to_cpu(con->v1.actual_peer_addr.nonce));
724
con->error_msg = "wrong peer at address";
725
return -1;
726
}
727
728
/*
729
* did we learn our address?
730
*/
731
if (ceph_addr_is_blank(my_addr)) {
732
memcpy(&my_addr->in_addr,
733
&con->v1.peer_addr_for_me.in_addr,
734
sizeof(con->v1.peer_addr_for_me.in_addr));
735
ceph_addr_set_port(my_addr, 0);
736
ceph_encode_my_addr(con->msgr);
737
dout("process_banner learned my addr is %s\n",
738
ceph_pr_addr(my_addr));
739
}
740
741
return 0;
742
}
743
744
static int process_connect(struct ceph_connection *con)
745
{
746
u64 sup_feat = from_msgr(con->msgr)->supported_features;
747
u64 req_feat = from_msgr(con->msgr)->required_features;
748
u64 server_feat = le64_to_cpu(con->v1.in_reply.features);
749
int ret;
750
751
dout("process_connect on %p tag %d\n", con, con->v1.in_tag);
752
753
if (con->v1.auth) {
754
int len = le32_to_cpu(con->v1.in_reply.authorizer_len);
755
756
/*
757
* Any connection that defines ->get_authorizer()
758
* should also define ->add_authorizer_challenge() and
759
* ->verify_authorizer_reply().
760
*
761
* See get_connect_authorizer().
762
*/
763
if (con->v1.in_reply.tag ==
764
CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
765
ret = con->ops->add_authorizer_challenge(
766
con, con->v1.auth->authorizer_reply_buf, len);
767
if (ret < 0)
768
return ret;
769
770
con_out_kvec_reset(con);
771
__prepare_write_connect(con);
772
prepare_read_connect(con);
773
return 0;
774
}
775
776
if (len) {
777
ret = con->ops->verify_authorizer_reply(con);
778
if (ret < 0) {
779
con->error_msg = "bad authorize reply";
780
return ret;
781
}
782
}
783
}
784
785
switch (con->v1.in_reply.tag) {
786
case CEPH_MSGR_TAG_FEATURES:
787
pr_err("%s%lld %s feature set mismatch,"
788
" my %llx < server's %llx, missing %llx\n",
789
ENTITY_NAME(con->peer_name),
790
ceph_pr_addr(&con->peer_addr),
791
sup_feat, server_feat, server_feat & ~sup_feat);
792
con->error_msg = "missing required protocol features";
793
return -1;
794
795
case CEPH_MSGR_TAG_BADPROTOVER:
796
pr_err("%s%lld %s protocol version mismatch,"
797
" my %d != server's %d\n",
798
ENTITY_NAME(con->peer_name),
799
ceph_pr_addr(&con->peer_addr),
800
le32_to_cpu(con->v1.out_connect.protocol_version),
801
le32_to_cpu(con->v1.in_reply.protocol_version));
802
con->error_msg = "protocol version mismatch";
803
return -1;
804
805
case CEPH_MSGR_TAG_BADAUTHORIZER:
806
con->v1.auth_retry++;
807
dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
808
con->v1.auth_retry);
809
if (con->v1.auth_retry == 2) {
810
con->error_msg = "connect authorization failure";
811
return -1;
812
}
813
con_out_kvec_reset(con);
814
ret = prepare_write_connect(con);
815
if (ret < 0)
816
return ret;
817
prepare_read_connect(con);
818
break;
819
820
case CEPH_MSGR_TAG_RESETSESSION:
821
/*
822
* If we connected with a large connect_seq but the peer
823
* has no record of a session with us (no connection, or
824
* connect_seq == 0), they will send RESETSESION to indicate
825
* that they must have reset their session, and may have
826
* dropped messages.
827
*/
828
dout("process_connect got RESET peer seq %u\n",
829
le32_to_cpu(con->v1.in_reply.connect_seq));
830
pr_info("%s%lld %s session reset\n",
831
ENTITY_NAME(con->peer_name),
832
ceph_pr_addr(&con->peer_addr));
833
ceph_con_reset_session(con);
834
con_out_kvec_reset(con);
835
ret = prepare_write_connect(con);
836
if (ret < 0)
837
return ret;
838
prepare_read_connect(con);
839
840
/* Tell ceph about it. */
841
mutex_unlock(&con->mutex);
842
if (con->ops->peer_reset)
843
con->ops->peer_reset(con);
844
mutex_lock(&con->mutex);
845
if (con->state != CEPH_CON_S_V1_CONNECT_MSG)
846
return -EAGAIN;
847
break;
848
849
case CEPH_MSGR_TAG_RETRY_SESSION:
850
/*
851
* If we sent a smaller connect_seq than the peer has, try
852
* again with a larger value.
853
*/
854
dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
855
le32_to_cpu(con->v1.out_connect.connect_seq),
856
le32_to_cpu(con->v1.in_reply.connect_seq));
857
con->v1.connect_seq = le32_to_cpu(con->v1.in_reply.connect_seq);
858
con_out_kvec_reset(con);
859
ret = prepare_write_connect(con);
860
if (ret < 0)
861
return ret;
862
prepare_read_connect(con);
863
break;
864
865
case CEPH_MSGR_TAG_RETRY_GLOBAL:
866
/*
867
* If we sent a smaller global_seq than the peer has, try
868
* again with a larger value.
869
*/
870
dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
871
con->v1.peer_global_seq,
872
le32_to_cpu(con->v1.in_reply.global_seq));
873
ceph_get_global_seq(con->msgr,
874
le32_to_cpu(con->v1.in_reply.global_seq));
875
con_out_kvec_reset(con);
876
ret = prepare_write_connect(con);
877
if (ret < 0)
878
return ret;
879
prepare_read_connect(con);
880
break;
881
882
case CEPH_MSGR_TAG_SEQ:
883
case CEPH_MSGR_TAG_READY:
884
if (req_feat & ~server_feat) {
885
pr_err("%s%lld %s protocol feature mismatch,"
886
" my required %llx > server's %llx, need %llx\n",
887
ENTITY_NAME(con->peer_name),
888
ceph_pr_addr(&con->peer_addr),
889
req_feat, server_feat, req_feat & ~server_feat);
890
con->error_msg = "missing required protocol features";
891
return -1;
892
}
893
894
WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG);
895
con->state = CEPH_CON_S_OPEN;
896
con->v1.auth_retry = 0; /* we authenticated; clear flag */
897
con->v1.peer_global_seq =
898
le32_to_cpu(con->v1.in_reply.global_seq);
899
con->v1.connect_seq++;
900
con->peer_features = server_feat;
901
dout("process_connect got READY gseq %d cseq %d (%d)\n",
902
con->v1.peer_global_seq,
903
le32_to_cpu(con->v1.in_reply.connect_seq),
904
con->v1.connect_seq);
905
WARN_ON(con->v1.connect_seq !=
906
le32_to_cpu(con->v1.in_reply.connect_seq));
907
908
if (con->v1.in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
909
ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX);
910
911
con->delay = 0; /* reset backoff memory */
912
913
if (con->v1.in_reply.tag == CEPH_MSGR_TAG_SEQ) {
914
prepare_write_seq(con);
915
prepare_read_seq(con);
916
} else {
917
prepare_read_tag(con);
918
}
919
break;
920
921
case CEPH_MSGR_TAG_WAIT:
922
/*
923
* If there is a connection race (we are opening
924
* connections to each other), one of us may just have
925
* to WAIT. This shouldn't happen if we are the
926
* client.
927
*/
928
con->error_msg = "protocol error, got WAIT as client";
929
return -1;
930
931
default:
932
con->error_msg = "protocol error, garbage tag during connect";
933
return -1;
934
}
935
return 0;
936
}
937
938
/*
939
* read (part of) an ack
940
*/
941
static int read_partial_ack(struct ceph_connection *con)
942
{
943
int size = sizeof(con->v1.in_temp_ack);
944
int end = size;
945
946
return read_partial(con, end, size, &con->v1.in_temp_ack);
947
}
948
949
/*
950
* We can finally discard anything that's been acked.
951
*/
952
static void process_ack(struct ceph_connection *con)
953
{
954
u64 ack = le64_to_cpu(con->v1.in_temp_ack);
955
956
if (con->v1.in_tag == CEPH_MSGR_TAG_ACK)
957
ceph_con_discard_sent(con, ack);
958
else
959
ceph_con_discard_requeued(con, ack);
960
961
prepare_read_tag(con);
962
}
963
964
static int read_partial_message_chunk(struct ceph_connection *con,
965
struct kvec *section,
966
unsigned int sec_len, u32 *crc)
967
{
968
int ret, left;
969
970
BUG_ON(!section);
971
972
while (section->iov_len < sec_len) {
973
BUG_ON(section->iov_base == NULL);
974
left = sec_len - section->iov_len;
975
ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
976
section->iov_len, left);
977
if (ret <= 0)
978
return ret;
979
section->iov_len += ret;
980
}
981
if (section->iov_len == sec_len)
982
*crc = crc32c(*crc, section->iov_base, section->iov_len);
983
984
return 1;
985
}
986
987
static inline int read_partial_message_section(struct ceph_connection *con,
988
struct kvec *section,
989
unsigned int sec_len, u32 *crc)
990
{
991
*crc = 0;
992
return read_partial_message_chunk(con, section, sec_len, crc);
993
}
994
995
static int read_partial_sparse_msg_extent(struct ceph_connection *con, u32 *crc)
996
{
997
struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
998
bool do_bounce = ceph_test_opt(from_msgr(con->msgr), RXBOUNCE);
999
1000
if (do_bounce && unlikely(!con->bounce_page)) {
1001
con->bounce_page = alloc_page(GFP_NOIO);
1002
if (!con->bounce_page) {
1003
pr_err("failed to allocate bounce page\n");
1004
return -ENOMEM;
1005
}
1006
}
1007
1008
while (cursor->sr_resid > 0) {
1009
struct page *page, *rpage;
1010
size_t off, len;
1011
int ret;
1012
1013
page = ceph_msg_data_next(cursor, &off, &len);
1014
rpage = do_bounce ? con->bounce_page : page;
1015
1016
/* clamp to what remains in extent */
1017
len = min_t(int, len, cursor->sr_resid);
1018
ret = ceph_tcp_recvpage(con->sock, rpage, (int)off, len);
1019
if (ret <= 0)
1020
return ret;
1021
*crc = ceph_crc32c_page(*crc, rpage, off, ret);
1022
ceph_msg_data_advance(cursor, (size_t)ret);
1023
cursor->sr_resid -= ret;
1024
if (do_bounce)
1025
memcpy_page(page, off, rpage, off, ret);
1026
}
1027
return 1;
1028
}
1029
1030
static int read_partial_sparse_msg_data(struct ceph_connection *con)
1031
{
1032
struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
1033
bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1034
u32 crc = 0;
1035
int ret = 1;
1036
1037
if (do_datacrc)
1038
crc = con->in_data_crc;
1039
1040
while (cursor->total_resid) {
1041
if (con->v1.in_sr_kvec.iov_base)
1042
ret = read_partial_message_chunk(con,
1043
&con->v1.in_sr_kvec,
1044
con->v1.in_sr_len,
1045
&crc);
1046
else if (cursor->sr_resid > 0)
1047
ret = read_partial_sparse_msg_extent(con, &crc);
1048
if (ret <= 0)
1049
break;
1050
1051
memset(&con->v1.in_sr_kvec, 0, sizeof(con->v1.in_sr_kvec));
1052
ret = con->ops->sparse_read(con, cursor,
1053
(char **)&con->v1.in_sr_kvec.iov_base);
1054
if (ret <= 0) {
1055
ret = ret ? ret : 1; /* must return > 0 to indicate success */
1056
break;
1057
}
1058
con->v1.in_sr_len = ret;
1059
}
1060
1061
if (do_datacrc)
1062
con->in_data_crc = crc;
1063
1064
return ret;
1065
}
1066
1067
static int read_partial_msg_data(struct ceph_connection *con)
1068
{
1069
struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
1070
bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1071
struct page *page;
1072
size_t page_offset;
1073
size_t length;
1074
u32 crc = 0;
1075
int ret;
1076
1077
if (do_datacrc)
1078
crc = con->in_data_crc;
1079
while (cursor->total_resid) {
1080
if (!cursor->resid) {
1081
ceph_msg_data_advance(cursor, 0);
1082
continue;
1083
}
1084
1085
page = ceph_msg_data_next(cursor, &page_offset, &length);
1086
ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
1087
if (ret <= 0) {
1088
if (do_datacrc)
1089
con->in_data_crc = crc;
1090
1091
return ret;
1092
}
1093
1094
if (do_datacrc)
1095
crc = ceph_crc32c_page(crc, page, page_offset, ret);
1096
ceph_msg_data_advance(cursor, (size_t)ret);
1097
}
1098
if (do_datacrc)
1099
con->in_data_crc = crc;
1100
1101
return 1; /* must return > 0 to indicate success */
1102
}
1103
1104
static int read_partial_msg_data_bounce(struct ceph_connection *con)
1105
{
1106
struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
1107
struct page *page;
1108
size_t off, len;
1109
u32 crc;
1110
int ret;
1111
1112
if (unlikely(!con->bounce_page)) {
1113
con->bounce_page = alloc_page(GFP_NOIO);
1114
if (!con->bounce_page) {
1115
pr_err("failed to allocate bounce page\n");
1116
return -ENOMEM;
1117
}
1118
}
1119
1120
crc = con->in_data_crc;
1121
while (cursor->total_resid) {
1122
if (!cursor->resid) {
1123
ceph_msg_data_advance(cursor, 0);
1124
continue;
1125
}
1126
1127
page = ceph_msg_data_next(cursor, &off, &len);
1128
ret = ceph_tcp_recvpage(con->sock, con->bounce_page, 0, len);
1129
if (ret <= 0) {
1130
con->in_data_crc = crc;
1131
return ret;
1132
}
1133
1134
crc = crc32c(crc, page_address(con->bounce_page), ret);
1135
memcpy_to_page(page, off, page_address(con->bounce_page), ret);
1136
1137
ceph_msg_data_advance(cursor, ret);
1138
}
1139
con->in_data_crc = crc;
1140
1141
return 1; /* must return > 0 to indicate success */
1142
}
1143
1144
/*
1145
* read (part of) a message.
1146
*/
1147
static int read_partial_message(struct ceph_connection *con)
1148
{
1149
struct ceph_msg *m = con->in_msg;
1150
int size;
1151
int end;
1152
int ret;
1153
unsigned int front_len, middle_len, data_len;
1154
bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1155
bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
1156
u64 seq;
1157
u32 crc;
1158
1159
dout("read_partial_message con %p msg %p\n", con, m);
1160
1161
/* header */
1162
size = sizeof(con->v1.in_hdr);
1163
end = size;
1164
ret = read_partial(con, end, size, &con->v1.in_hdr);
1165
if (ret <= 0)
1166
return ret;
1167
1168
crc = crc32c(0, &con->v1.in_hdr, offsetof(struct ceph_msg_header, crc));
1169
if (cpu_to_le32(crc) != con->v1.in_hdr.crc) {
1170
pr_err("read_partial_message bad hdr crc %u != expected %u\n",
1171
crc, con->v1.in_hdr.crc);
1172
return -EBADMSG;
1173
}
1174
1175
front_len = le32_to_cpu(con->v1.in_hdr.front_len);
1176
if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1177
return -EIO;
1178
middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
1179
if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
1180
return -EIO;
1181
data_len = le32_to_cpu(con->v1.in_hdr.data_len);
1182
if (data_len > CEPH_MSG_MAX_DATA_LEN)
1183
return -EIO;
1184
1185
/* verify seq# */
1186
seq = le64_to_cpu(con->v1.in_hdr.seq);
1187
if ((s64)seq - (s64)con->in_seq < 1) {
1188
pr_info("skipping %s%lld %s seq %lld expected %lld\n",
1189
ENTITY_NAME(con->peer_name),
1190
ceph_pr_addr(&con->peer_addr),
1191
seq, con->in_seq + 1);
1192
con->v1.in_base_pos = -front_len - middle_len - data_len -
1193
sizeof_footer(con);
1194
con->v1.in_tag = CEPH_MSGR_TAG_READY;
1195
return 1;
1196
} else if ((s64)seq - (s64)con->in_seq > 1) {
1197
pr_err("read_partial_message bad seq %lld expected %lld\n",
1198
seq, con->in_seq + 1);
1199
con->error_msg = "bad message sequence # for incoming message";
1200
return -EBADE;
1201
}
1202
1203
/* allocate message? */
1204
if (!con->in_msg) {
1205
int skip = 0;
1206
1207
dout("got hdr type %d front %d data %d\n", con->v1.in_hdr.type,
1208
front_len, data_len);
1209
ret = ceph_con_in_msg_alloc(con, &con->v1.in_hdr, &skip);
1210
if (ret < 0)
1211
return ret;
1212
1213
BUG_ON((!con->in_msg) ^ skip);
1214
if (skip) {
1215
/* skip this message */
1216
dout("alloc_msg said skip message\n");
1217
con->v1.in_base_pos = -front_len - middle_len -
1218
data_len - sizeof_footer(con);
1219
con->v1.in_tag = CEPH_MSGR_TAG_READY;
1220
con->in_seq++;
1221
return 1;
1222
}
1223
1224
BUG_ON(!con->in_msg);
1225
BUG_ON(con->in_msg->con != con);
1226
m = con->in_msg;
1227
m->front.iov_len = 0; /* haven't read it yet */
1228
if (m->middle)
1229
m->middle->vec.iov_len = 0;
1230
1231
/* prepare for data payload, if any */
1232
1233
if (data_len)
1234
prepare_message_data(con->in_msg, data_len);
1235
}
1236
1237
/* front */
1238
ret = read_partial_message_section(con, &m->front, front_len,
1239
&con->in_front_crc);
1240
if (ret <= 0)
1241
return ret;
1242
1243
/* middle */
1244
if (m->middle) {
1245
ret = read_partial_message_section(con, &m->middle->vec,
1246
middle_len,
1247
&con->in_middle_crc);
1248
if (ret <= 0)
1249
return ret;
1250
}
1251
1252
/* (page) data */
1253
if (data_len) {
1254
if (!m->num_data_items)
1255
return -EIO;
1256
1257
if (m->sparse_read_total)
1258
ret = read_partial_sparse_msg_data(con);
1259
else if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
1260
ret = read_partial_msg_data_bounce(con);
1261
else
1262
ret = read_partial_msg_data(con);
1263
if (ret <= 0)
1264
return ret;
1265
}
1266
1267
/* footer */
1268
size = sizeof_footer(con);
1269
end += size;
1270
ret = read_partial(con, end, size, &m->footer);
1271
if (ret <= 0)
1272
return ret;
1273
1274
if (!need_sign) {
1275
m->footer.flags = m->old_footer.flags;
1276
m->footer.sig = 0;
1277
}
1278
1279
dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1280
m, front_len, m->footer.front_crc, middle_len,
1281
m->footer.middle_crc, data_len, m->footer.data_crc);
1282
1283
/* crc ok? */
1284
if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
1285
pr_err("read_partial_message %p front crc %u != exp. %u\n",
1286
m, con->in_front_crc, m->footer.front_crc);
1287
return -EBADMSG;
1288
}
1289
if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
1290
pr_err("read_partial_message %p middle crc %u != exp %u\n",
1291
m, con->in_middle_crc, m->footer.middle_crc);
1292
return -EBADMSG;
1293
}
1294
if (do_datacrc &&
1295
(m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
1296
con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
1297
pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
1298
con->in_data_crc, le32_to_cpu(m->footer.data_crc));
1299
return -EBADMSG;
1300
}
1301
1302
if (need_sign && con->ops->check_message_signature &&
1303
con->ops->check_message_signature(m)) {
1304
pr_err("read_partial_message %p signature check failed\n", m);
1305
return -EBADMSG;
1306
}
1307
1308
return 1; /* done! */
1309
}
1310
1311
static int read_keepalive_ack(struct ceph_connection *con)
1312
{
1313
struct ceph_timespec ceph_ts;
1314
size_t size = sizeof(ceph_ts);
1315
int ret = read_partial(con, size, size, &ceph_ts);
1316
if (ret <= 0)
1317
return ret;
1318
ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts);
1319
prepare_read_tag(con);
1320
return 1;
1321
}
1322
1323
/*
1324
* Read what we can from the socket.
1325
*/
1326
int ceph_con_v1_try_read(struct ceph_connection *con)
1327
{
1328
int ret = -1;
1329
1330
more:
1331
dout("try_read start %p state %d\n", con, con->state);
1332
if (con->state != CEPH_CON_S_V1_BANNER &&
1333
con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1334
con->state != CEPH_CON_S_OPEN)
1335
return 0;
1336
1337
BUG_ON(!con->sock);
1338
1339
dout("try_read tag %d in_base_pos %d\n", con->v1.in_tag,
1340
con->v1.in_base_pos);
1341
1342
if (con->state == CEPH_CON_S_V1_BANNER) {
1343
ret = read_partial_banner(con);
1344
if (ret <= 0)
1345
goto out;
1346
ret = process_banner(con);
1347
if (ret < 0)
1348
goto out;
1349
1350
con->state = CEPH_CON_S_V1_CONNECT_MSG;
1351
1352
/*
1353
* Received banner is good, exchange connection info.
1354
* Do not reset out_kvec, as sending our banner raced
1355
* with receiving peer banner after connect completed.
1356
*/
1357
ret = prepare_write_connect(con);
1358
if (ret < 0)
1359
goto out;
1360
prepare_read_connect(con);
1361
1362
/* Send connection info before awaiting response */
1363
goto out;
1364
}
1365
1366
if (con->state == CEPH_CON_S_V1_CONNECT_MSG) {
1367
ret = read_partial_connect(con);
1368
if (ret <= 0)
1369
goto out;
1370
ret = process_connect(con);
1371
if (ret < 0)
1372
goto out;
1373
goto more;
1374
}
1375
1376
WARN_ON(con->state != CEPH_CON_S_OPEN);
1377
1378
if (con->v1.in_base_pos < 0) {
1379
/*
1380
* skipping + discarding content.
1381
*/
1382
ret = ceph_tcp_recvmsg(con->sock, NULL, -con->v1.in_base_pos);
1383
if (ret <= 0)
1384
goto out;
1385
dout("skipped %d / %d bytes\n", ret, -con->v1.in_base_pos);
1386
con->v1.in_base_pos += ret;
1387
if (con->v1.in_base_pos)
1388
goto more;
1389
}
1390
if (con->v1.in_tag == CEPH_MSGR_TAG_READY) {
1391
/*
1392
* what's next?
1393
*/
1394
ret = ceph_tcp_recvmsg(con->sock, &con->v1.in_tag, 1);
1395
if (ret <= 0)
1396
goto out;
1397
dout("try_read got tag %d\n", con->v1.in_tag);
1398
switch (con->v1.in_tag) {
1399
case CEPH_MSGR_TAG_MSG:
1400
prepare_read_message(con);
1401
break;
1402
case CEPH_MSGR_TAG_ACK:
1403
prepare_read_ack(con);
1404
break;
1405
case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
1406
prepare_read_keepalive_ack(con);
1407
break;
1408
case CEPH_MSGR_TAG_CLOSE:
1409
ceph_con_close_socket(con);
1410
con->state = CEPH_CON_S_CLOSED;
1411
goto out;
1412
default:
1413
goto bad_tag;
1414
}
1415
}
1416
if (con->v1.in_tag == CEPH_MSGR_TAG_MSG) {
1417
ret = read_partial_message(con);
1418
if (ret <= 0) {
1419
switch (ret) {
1420
case -EBADMSG:
1421
con->error_msg = "bad crc/signature";
1422
fallthrough;
1423
case -EBADE:
1424
ret = -EIO;
1425
break;
1426
case -EIO:
1427
con->error_msg = "io error";
1428
break;
1429
}
1430
goto out;
1431
}
1432
if (con->v1.in_tag == CEPH_MSGR_TAG_READY)
1433
goto more;
1434
ceph_con_process_message(con);
1435
if (con->state == CEPH_CON_S_OPEN)
1436
prepare_read_tag(con);
1437
goto more;
1438
}
1439
if (con->v1.in_tag == CEPH_MSGR_TAG_ACK ||
1440
con->v1.in_tag == CEPH_MSGR_TAG_SEQ) {
1441
/*
1442
* the final handshake seq exchange is semantically
1443
* equivalent to an ACK
1444
*/
1445
ret = read_partial_ack(con);
1446
if (ret <= 0)
1447
goto out;
1448
process_ack(con);
1449
goto more;
1450
}
1451
if (con->v1.in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
1452
ret = read_keepalive_ack(con);
1453
if (ret <= 0)
1454
goto out;
1455
goto more;
1456
}
1457
1458
out:
1459
dout("try_read done on %p ret %d\n", con, ret);
1460
return ret;
1461
1462
bad_tag:
1463
pr_err("try_read bad tag %d\n", con->v1.in_tag);
1464
con->error_msg = "protocol error, garbage tag";
1465
ret = -1;
1466
goto out;
1467
}
1468
1469
/*
1470
* Write something to the socket. Called in a worker thread when the
1471
* socket appears to be writeable and we have something ready to send.
1472
*/
1473
int ceph_con_v1_try_write(struct ceph_connection *con)
1474
{
1475
int ret = 1;
1476
1477
dout("try_write start %p state %d\n", con, con->state);
1478
if (con->state != CEPH_CON_S_PREOPEN &&
1479
con->state != CEPH_CON_S_V1_BANNER &&
1480
con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1481
con->state != CEPH_CON_S_OPEN)
1482
return 0;
1483
1484
/* open the socket first? */
1485
if (con->state == CEPH_CON_S_PREOPEN) {
1486
BUG_ON(con->sock);
1487
con->state = CEPH_CON_S_V1_BANNER;
1488
1489
con_out_kvec_reset(con);
1490
prepare_write_banner(con);
1491
prepare_read_banner(con);
1492
1493
BUG_ON(con->in_msg);
1494
con->v1.in_tag = CEPH_MSGR_TAG_READY;
1495
dout("try_write initiating connect on %p new state %d\n",
1496
con, con->state);
1497
ret = ceph_tcp_connect(con);
1498
if (ret < 0) {
1499
con->error_msg = "connect error";
1500
goto out;
1501
}
1502
}
1503
1504
more:
1505
dout("try_write out_kvec_bytes %d\n", con->v1.out_kvec_bytes);
1506
BUG_ON(!con->sock);
1507
1508
/* kvec data queued? */
1509
if (con->v1.out_kvec_left) {
1510
ret = write_partial_kvec(con);
1511
if (ret <= 0)
1512
goto out;
1513
}
1514
if (con->v1.out_skip) {
1515
ret = write_partial_skip(con);
1516
if (ret <= 0)
1517
goto out;
1518
}
1519
1520
/* msg pages? */
1521
if (con->out_msg) {
1522
if (con->v1.out_msg_done) {
1523
ceph_msg_put(con->out_msg);
1524
con->out_msg = NULL; /* we're done with this one */
1525
goto do_next;
1526
}
1527
1528
ret = write_partial_message_data(con);
1529
if (ret == 1)
1530
goto more; /* we need to send the footer, too! */
1531
if (ret == 0)
1532
goto out;
1533
if (ret < 0) {
1534
dout("try_write write_partial_message_data err %d\n",
1535
ret);
1536
goto out;
1537
}
1538
}
1539
1540
do_next:
1541
if (con->state == CEPH_CON_S_OPEN) {
1542
if (ceph_con_flag_test_and_clear(con,
1543
CEPH_CON_F_KEEPALIVE_PENDING)) {
1544
prepare_write_keepalive(con);
1545
goto more;
1546
}
1547
/* is anything else pending? */
1548
if (!list_empty(&con->out_queue)) {
1549
prepare_write_message(con);
1550
goto more;
1551
}
1552
if (con->in_seq > con->in_seq_acked) {
1553
prepare_write_ack(con);
1554
goto more;
1555
}
1556
}
1557
1558
/* Nothing to do! */
1559
ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
1560
dout("try_write nothing else to write.\n");
1561
ret = 0;
1562
out:
1563
dout("try_write done on %p ret %d\n", con, ret);
1564
return ret;
1565
}
1566
1567
void ceph_con_v1_revoke(struct ceph_connection *con)
1568
{
1569
struct ceph_msg *msg = con->out_msg;
1570
1571
WARN_ON(con->v1.out_skip);
1572
/* footer */
1573
if (con->v1.out_msg_done) {
1574
con->v1.out_skip += con_out_kvec_skip(con);
1575
} else {
1576
WARN_ON(!msg->data_length);
1577
con->v1.out_skip += sizeof_footer(con);
1578
}
1579
/* data, middle, front */
1580
if (msg->data_length)
1581
con->v1.out_skip += msg->cursor.total_resid;
1582
if (msg->middle)
1583
con->v1.out_skip += con_out_kvec_skip(con);
1584
con->v1.out_skip += con_out_kvec_skip(con);
1585
1586
dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
1587
con->v1.out_kvec_bytes, con->v1.out_skip);
1588
}
1589
1590
void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
1591
{
1592
unsigned int front_len = le32_to_cpu(con->v1.in_hdr.front_len);
1593
unsigned int middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
1594
unsigned int data_len = le32_to_cpu(con->v1.in_hdr.data_len);
1595
1596
/* skip rest of message */
1597
con->v1.in_base_pos = con->v1.in_base_pos -
1598
sizeof(struct ceph_msg_header) -
1599
front_len -
1600
middle_len -
1601
data_len -
1602
sizeof(struct ceph_msg_footer);
1603
1604
con->v1.in_tag = CEPH_MSGR_TAG_READY;
1605
con->in_seq++;
1606
1607
dout("%s con %p in_base_pos %d\n", __func__, con, con->v1.in_base_pos);
1608
}
1609
1610
bool ceph_con_v1_opened(struct ceph_connection *con)
1611
{
1612
return con->v1.connect_seq;
1613
}
1614
1615
void ceph_con_v1_reset_session(struct ceph_connection *con)
1616
{
1617
con->v1.connect_seq = 0;
1618
con->v1.peer_global_seq = 0;
1619
}
1620
1621
void ceph_con_v1_reset_protocol(struct ceph_connection *con)
1622
{
1623
con->v1.out_skip = 0;
1624
}
1625
1626