Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/fs/afs/rxrpc.c
26285 views
1
// SPDX-License-Identifier: GPL-2.0-or-later
2
/* Maintain an RxRPC server socket to do AFS communications through
3
*
4
* Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5
* Written by David Howells ([email protected])
6
*/
7
8
#include <linux/slab.h>
9
#include <linux/sched/signal.h>
10
11
#include <net/sock.h>
12
#include <net/af_rxrpc.h>
13
#include "internal.h"
14
#include "afs_cm.h"
15
#include "protocol_yfs.h"
16
#define RXRPC_TRACE_ONLY_DEFINE_ENUMS
17
#include <trace/events/rxrpc.h>
18
19
struct workqueue_struct *afs_async_calls;
20
21
static void afs_deferred_free_worker(struct work_struct *work);
22
static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
23
static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
24
static void afs_process_async_call(struct work_struct *);
25
static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
26
static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
27
static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID);
28
static void afs_rx_notify_oob(struct sock *sk, struct sk_buff *oob);
29
static int afs_deliver_cm_op_id(struct afs_call *);
30
31
static const struct rxrpc_kernel_ops afs_rxrpc_callback_ops = {
32
.notify_new_call = afs_rx_new_call,
33
.discard_new_call = afs_rx_discard_new_call,
34
.user_attach_call = afs_rx_attach,
35
.notify_oob = afs_rx_notify_oob,
36
};
37
38
/* asynchronous incoming call initial processing */
39
static const struct afs_call_type afs_RXCMxxxx = {
40
.name = "CB.xxxx",
41
.deliver = afs_deliver_cm_op_id,
42
};
43
44
/*
45
* open an RxRPC socket and bind it to be a server for callback notifications
46
* - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
47
*/
48
int afs_open_socket(struct afs_net *net)
49
{
50
struct sockaddr_rxrpc srx;
51
struct socket *socket;
52
int ret;
53
54
_enter("");
55
56
ret = sock_create_kern(net->net, AF_RXRPC, SOCK_DGRAM, PF_INET6, &socket);
57
if (ret < 0)
58
goto error_1;
59
60
socket->sk->sk_allocation = GFP_NOFS;
61
socket->sk->sk_user_data = net;
62
63
/* bind the callback manager's address to make this a server socket */
64
memset(&srx, 0, sizeof(srx));
65
srx.srx_family = AF_RXRPC;
66
srx.srx_service = CM_SERVICE;
67
srx.transport_type = SOCK_DGRAM;
68
srx.transport_len = sizeof(srx.transport.sin6);
69
srx.transport.sin6.sin6_family = AF_INET6;
70
srx.transport.sin6.sin6_port = htons(AFS_CM_PORT);
71
72
ret = rxrpc_sock_set_min_security_level(socket->sk,
73
RXRPC_SECURITY_ENCRYPT);
74
if (ret < 0)
75
goto error_2;
76
77
ret = rxrpc_sock_set_manage_response(socket->sk, true);
78
if (ret < 0)
79
goto error_2;
80
81
ret = afs_create_token_key(net, socket);
82
if (ret < 0)
83
pr_err("Couldn't create RxGK CM key: %d\n", ret);
84
85
ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
86
if (ret == -EADDRINUSE) {
87
srx.transport.sin6.sin6_port = 0;
88
ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
89
}
90
if (ret < 0)
91
goto error_2;
92
93
srx.srx_service = YFS_CM_SERVICE;
94
ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
95
if (ret < 0)
96
goto error_2;
97
98
/* Ideally, we'd turn on service upgrade here, but we can't because
99
* OpenAFS is buggy and leaks the userStatus field from packet to
100
* packet and between FS packets and CB packets - so if we try to do an
101
* upgrade on an FS packet, OpenAFS will leak that into the CB packet
102
* it sends back to us.
103
*/
104
105
rxrpc_kernel_set_notifications(socket, &afs_rxrpc_callback_ops);
106
107
ret = kernel_listen(socket, INT_MAX);
108
if (ret < 0)
109
goto error_2;
110
111
net->socket = socket;
112
afs_charge_preallocation(&net->charge_preallocation_work);
113
_leave(" = 0");
114
return 0;
115
116
error_2:
117
sock_release(socket);
118
error_1:
119
_leave(" = %d", ret);
120
return ret;
121
}
122
123
/*
124
* close the RxRPC socket AFS was using
125
*/
126
void afs_close_socket(struct afs_net *net)
127
{
128
_enter("");
129
130
kernel_listen(net->socket, 0);
131
flush_workqueue(afs_async_calls);
132
133
if (net->spare_incoming_call) {
134
afs_put_call(net->spare_incoming_call);
135
net->spare_incoming_call = NULL;
136
}
137
138
_debug("outstanding %u", atomic_read(&net->nr_outstanding_calls));
139
wait_var_event(&net->nr_outstanding_calls,
140
!atomic_read(&net->nr_outstanding_calls));
141
_debug("no outstanding calls");
142
143
kernel_sock_shutdown(net->socket, SHUT_RDWR);
144
flush_workqueue(afs_async_calls);
145
net->socket->sk->sk_user_data = NULL;
146
sock_release(net->socket);
147
key_put(net->fs_cm_token_key);
148
149
_debug("dework");
150
_leave("");
151
}
152
153
/*
154
* Allocate a call.
155
*/
156
static struct afs_call *afs_alloc_call(struct afs_net *net,
157
const struct afs_call_type *type,
158
gfp_t gfp)
159
{
160
struct afs_call *call;
161
int o;
162
163
call = kzalloc(sizeof(*call), gfp);
164
if (!call)
165
return NULL;
166
167
call->type = type;
168
call->net = net;
169
call->debug_id = atomic_inc_return(&rxrpc_debug_id);
170
refcount_set(&call->ref, 1);
171
INIT_WORK(&call->async_work, type->async_rx ?: afs_process_async_call);
172
INIT_WORK(&call->work, call->type->work);
173
INIT_WORK(&call->free_work, afs_deferred_free_worker);
174
init_waitqueue_head(&call->waitq);
175
spin_lock_init(&call->state_lock);
176
call->iter = &call->def_iter;
177
178
o = atomic_inc_return(&net->nr_outstanding_calls);
179
trace_afs_call(call->debug_id, afs_call_trace_alloc, 1, o,
180
__builtin_return_address(0));
181
return call;
182
}
183
184
static void afs_free_call(struct afs_call *call)
185
{
186
struct afs_net *net = call->net;
187
int o;
188
189
ASSERT(!work_pending(&call->async_work));
190
191
rxrpc_kernel_put_peer(call->peer);
192
193
if (call->rxcall) {
194
rxrpc_kernel_shutdown_call(net->socket, call->rxcall);
195
rxrpc_kernel_put_call(net->socket, call->rxcall);
196
call->rxcall = NULL;
197
}
198
if (call->type->destructor)
199
call->type->destructor(call);
200
201
afs_unuse_server_notime(call->net, call->server, afs_server_trace_unuse_call);
202
kfree(call->request);
203
204
o = atomic_read(&net->nr_outstanding_calls);
205
trace_afs_call(call->debug_id, afs_call_trace_free, 0, o,
206
__builtin_return_address(0));
207
kfree(call);
208
209
o = atomic_dec_return(&net->nr_outstanding_calls);
210
if (o == 0)
211
wake_up_var(&net->nr_outstanding_calls);
212
}
213
214
/*
215
* Dispose of a reference on a call.
216
*/
217
void afs_put_call(struct afs_call *call)
218
{
219
struct afs_net *net = call->net;
220
unsigned int debug_id = call->debug_id;
221
bool zero;
222
int r, o;
223
224
zero = __refcount_dec_and_test(&call->ref, &r);
225
o = atomic_read(&net->nr_outstanding_calls);
226
trace_afs_call(debug_id, afs_call_trace_put, r - 1, o,
227
__builtin_return_address(0));
228
if (zero)
229
afs_free_call(call);
230
}
231
232
static void afs_deferred_free_worker(struct work_struct *work)
233
{
234
struct afs_call *call = container_of(work, struct afs_call, free_work);
235
236
afs_free_call(call);
237
}
238
239
/*
240
* Dispose of a reference on a call, deferring the cleanup to a workqueue
241
* to avoid lock recursion.
242
*/
243
void afs_deferred_put_call(struct afs_call *call)
244
{
245
struct afs_net *net = call->net;
246
unsigned int debug_id = call->debug_id;
247
bool zero;
248
int r, o;
249
250
zero = __refcount_dec_and_test(&call->ref, &r);
251
o = atomic_read(&net->nr_outstanding_calls);
252
trace_afs_call(debug_id, afs_call_trace_put, r - 1, o,
253
__builtin_return_address(0));
254
if (zero)
255
schedule_work(&call->free_work);
256
}
257
258
/*
259
* Queue the call for actual work.
260
*/
261
static void afs_queue_call_work(struct afs_call *call)
262
{
263
if (call->type->work) {
264
afs_get_call(call, afs_call_trace_work);
265
if (!queue_work(afs_wq, &call->work))
266
afs_put_call(call);
267
}
268
}
269
270
/*
271
* allocate a call with flat request and reply buffers
272
*/
273
struct afs_call *afs_alloc_flat_call(struct afs_net *net,
274
const struct afs_call_type *type,
275
size_t request_size, size_t reply_max)
276
{
277
struct afs_call *call;
278
279
call = afs_alloc_call(net, type, GFP_NOFS);
280
if (!call)
281
goto nomem_call;
282
283
if (request_size) {
284
call->request_size = request_size;
285
call->request = kmalloc(request_size, GFP_NOFS);
286
if (!call->request)
287
goto nomem_free;
288
}
289
290
if (reply_max) {
291
call->reply_max = reply_max;
292
call->buffer = kmalloc(reply_max, GFP_NOFS);
293
if (!call->buffer)
294
goto nomem_free;
295
}
296
297
afs_extract_to_buf(call, call->reply_max);
298
call->operation_ID = type->op;
299
init_waitqueue_head(&call->waitq);
300
return call;
301
302
nomem_free:
303
afs_put_call(call);
304
nomem_call:
305
return NULL;
306
}
307
308
/*
309
* clean up a call with flat buffer
310
*/
311
void afs_flat_call_destructor(struct afs_call *call)
312
{
313
_enter("");
314
315
kfree(call->request);
316
call->request = NULL;
317
kfree(call->buffer);
318
call->buffer = NULL;
319
}
320
321
/*
322
* Advance the AFS call state when the RxRPC call ends the transmit phase.
323
*/
324
static void afs_notify_end_request_tx(struct sock *sock,
325
struct rxrpc_call *rxcall,
326
unsigned long call_user_ID)
327
{
328
struct afs_call *call = (struct afs_call *)call_user_ID;
329
330
afs_set_call_state(call, AFS_CALL_CL_REQUESTING, AFS_CALL_CL_AWAIT_REPLY);
331
}
332
333
/*
334
* Initiate a call and synchronously queue up the parameters for dispatch. Any
335
* error is stored into the call struct, which the caller must check for.
336
*/
337
void afs_make_call(struct afs_call *call, gfp_t gfp)
338
{
339
struct rxrpc_call *rxcall;
340
struct msghdr msg;
341
struct kvec iov[1];
342
size_t len;
343
s64 tx_total_len;
344
int ret;
345
346
_enter(",{%pISp+%u},", rxrpc_kernel_remote_addr(call->peer), call->service_id);
347
348
ASSERT(call->type != NULL);
349
ASSERT(call->type->name != NULL);
350
351
_debug("____MAKE %p{%s,%x} [%d]____",
352
call, call->type->name, key_serial(call->key),
353
atomic_read(&call->net->nr_outstanding_calls));
354
355
trace_afs_make_call(call);
356
357
/* Work out the length we're going to transmit. This is awkward for
358
* calls such as FS.StoreData where there's an extra injection of data
359
* after the initial fixed part.
360
*/
361
tx_total_len = call->request_size;
362
if (call->write_iter)
363
tx_total_len += iov_iter_count(call->write_iter);
364
365
/* If the call is going to be asynchronous, we need an extra ref for
366
* the call to hold itself so the caller need not hang on to its ref.
367
*/
368
if (call->async) {
369
afs_get_call(call, afs_call_trace_get);
370
call->drop_ref = true;
371
}
372
373
/* create a call */
374
rxcall = rxrpc_kernel_begin_call(call->net->socket, call->peer, call->key,
375
(unsigned long)call,
376
tx_total_len,
377
call->max_lifespan,
378
gfp,
379
(call->async ?
380
afs_wake_up_async_call :
381
afs_wake_up_call_waiter),
382
call->service_id,
383
call->upgrade,
384
(call->intr ? RXRPC_PREINTERRUPTIBLE :
385
RXRPC_UNINTERRUPTIBLE),
386
call->debug_id);
387
if (IS_ERR(rxcall)) {
388
ret = PTR_ERR(rxcall);
389
call->error = ret;
390
goto error_kill_call;
391
}
392
393
call->rxcall = rxcall;
394
call->issue_time = ktime_get_real();
395
396
/* send the request */
397
iov[0].iov_base = call->request;
398
iov[0].iov_len = call->request_size;
399
400
msg.msg_name = NULL;
401
msg.msg_namelen = 0;
402
iov_iter_kvec(&msg.msg_iter, ITER_SOURCE, iov, 1, call->request_size);
403
msg.msg_control = NULL;
404
msg.msg_controllen = 0;
405
msg.msg_flags = MSG_WAITALL | (call->write_iter ? MSG_MORE : 0);
406
407
ret = rxrpc_kernel_send_data(call->net->socket, rxcall,
408
&msg, call->request_size,
409
afs_notify_end_request_tx);
410
if (ret < 0)
411
goto error_do_abort;
412
413
if (call->write_iter) {
414
msg.msg_iter = *call->write_iter;
415
msg.msg_flags &= ~MSG_MORE;
416
trace_afs_send_data(call, &msg);
417
418
ret = rxrpc_kernel_send_data(call->net->socket,
419
call->rxcall, &msg,
420
iov_iter_count(&msg.msg_iter),
421
afs_notify_end_request_tx);
422
*call->write_iter = msg.msg_iter;
423
424
trace_afs_sent_data(call, &msg, ret);
425
if (ret < 0)
426
goto error_do_abort;
427
}
428
429
/* Note that at this point, we may have received the reply or an abort
430
* - and an asynchronous call may already have completed.
431
*
432
* afs_wait_for_call_to_complete(call)
433
* must be called to synchronously clean up.
434
*/
435
return;
436
437
error_do_abort:
438
if (ret != -ECONNABORTED)
439
rxrpc_kernel_abort_call(call->net->socket, rxcall,
440
RX_USER_ABORT, ret,
441
afs_abort_send_data_error);
442
if (call->async) {
443
afs_see_call(call, afs_call_trace_async_abort);
444
return;
445
}
446
447
if (ret == -ECONNABORTED) {
448
len = 0;
449
iov_iter_kvec(&msg.msg_iter, ITER_DEST, NULL, 0, 0);
450
rxrpc_kernel_recv_data(call->net->socket, rxcall,
451
&msg.msg_iter, &len, false,
452
&call->abort_code, &call->service_id);
453
call->responded = true;
454
}
455
call->error = ret;
456
trace_afs_call_done(call);
457
error_kill_call:
458
if (call->async)
459
afs_see_call(call, afs_call_trace_async_kill);
460
if (call->type->immediate_cancel)
461
call->type->immediate_cancel(call);
462
463
/* We need to dispose of the extra ref we grabbed for an async call.
464
* The call, however, might be queued on afs_async_calls and we need to
465
* make sure we don't get any more notifications that might requeue it.
466
*/
467
if (call->rxcall)
468
rxrpc_kernel_shutdown_call(call->net->socket, call->rxcall);
469
if (call->async) {
470
if (cancel_work_sync(&call->async_work))
471
afs_put_call(call);
472
afs_set_call_complete(call, ret, 0);
473
}
474
475
call->error = ret;
476
call->state = AFS_CALL_COMPLETE;
477
_leave(" = %d", ret);
478
}
479
480
/*
481
* Log remote abort codes that indicate that we have a protocol disagreement
482
* with the server.
483
*/
484
static void afs_log_error(struct afs_call *call, s32 remote_abort)
485
{
486
static int max = 0;
487
const char *msg;
488
int m;
489
490
switch (remote_abort) {
491
case RX_EOF: msg = "unexpected EOF"; break;
492
case RXGEN_CC_MARSHAL: msg = "client marshalling"; break;
493
case RXGEN_CC_UNMARSHAL: msg = "client unmarshalling"; break;
494
case RXGEN_SS_MARSHAL: msg = "server marshalling"; break;
495
case RXGEN_SS_UNMARSHAL: msg = "server unmarshalling"; break;
496
case RXGEN_DECODE: msg = "opcode decode"; break;
497
case RXGEN_SS_XDRFREE: msg = "server XDR cleanup"; break;
498
case RXGEN_CC_XDRFREE: msg = "client XDR cleanup"; break;
499
case -32: msg = "insufficient data"; break;
500
default:
501
return;
502
}
503
504
m = max;
505
if (m < 3) {
506
max = m + 1;
507
pr_notice("kAFS: Peer reported %s failure on %s [%pISp]\n",
508
msg, call->type->name,
509
rxrpc_kernel_remote_addr(call->peer));
510
}
511
}
512
513
/*
514
* deliver messages to a call
515
*/
516
void afs_deliver_to_call(struct afs_call *call)
517
{
518
enum afs_call_state state;
519
size_t len;
520
u32 abort_code, remote_abort = 0;
521
int ret;
522
523
_enter("%s", call->type->name);
524
525
while (state = READ_ONCE(call->state),
526
state == AFS_CALL_CL_AWAIT_REPLY ||
527
state == AFS_CALL_SV_AWAIT_OP_ID ||
528
state == AFS_CALL_SV_AWAIT_REQUEST ||
529
state == AFS_CALL_SV_AWAIT_ACK
530
) {
531
if (state == AFS_CALL_SV_AWAIT_ACK) {
532
len = 0;
533
iov_iter_kvec(&call->def_iter, ITER_DEST, NULL, 0, 0);
534
ret = rxrpc_kernel_recv_data(call->net->socket,
535
call->rxcall, &call->def_iter,
536
&len, false, &remote_abort,
537
&call->service_id);
538
trace_afs_receive_data(call, &call->def_iter, false, ret);
539
540
if (ret == -EINPROGRESS || ret == -EAGAIN)
541
return;
542
if (ret < 0 || ret == 1) {
543
if (ret == 1)
544
ret = 0;
545
goto call_complete;
546
}
547
return;
548
}
549
550
ret = call->type->deliver(call);
551
state = READ_ONCE(call->state);
552
if (ret == 0 && call->unmarshalling_error)
553
ret = -EBADMSG;
554
switch (ret) {
555
case 0:
556
call->responded = true;
557
afs_queue_call_work(call);
558
if (state == AFS_CALL_CL_PROC_REPLY) {
559
if (call->op)
560
set_bit(AFS_SERVER_FL_MAY_HAVE_CB,
561
&call->op->server->flags);
562
goto call_complete;
563
}
564
ASSERTCMP(state, >, AFS_CALL_CL_PROC_REPLY);
565
goto done;
566
case -EINPROGRESS:
567
case -EAGAIN:
568
goto out;
569
case -ECONNABORTED:
570
ASSERTCMP(state, ==, AFS_CALL_COMPLETE);
571
call->responded = true;
572
afs_log_error(call, call->abort_code);
573
goto done;
574
case -ENOTSUPP:
575
call->responded = true;
576
abort_code = RXGEN_OPCODE;
577
rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
578
abort_code, ret,
579
afs_abort_op_not_supported);
580
goto local_abort;
581
case -EIO:
582
pr_err("kAFS: Call %u in bad state %u\n",
583
call->debug_id, state);
584
fallthrough;
585
case -ENODATA:
586
case -EBADMSG:
587
case -EMSGSIZE:
588
case -ENOMEM:
589
case -EFAULT:
590
abort_code = RXGEN_CC_UNMARSHAL;
591
if (state != AFS_CALL_CL_AWAIT_REPLY)
592
abort_code = RXGEN_SS_UNMARSHAL;
593
rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
594
abort_code, ret,
595
afs_abort_unmarshal_error);
596
goto local_abort;
597
default:
598
abort_code = RX_CALL_DEAD;
599
rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
600
abort_code, ret,
601
afs_abort_general_error);
602
goto local_abort;
603
}
604
}
605
606
done:
607
if (call->type->done)
608
call->type->done(call);
609
out:
610
_leave("");
611
return;
612
613
local_abort:
614
abort_code = 0;
615
call_complete:
616
afs_set_call_complete(call, ret, remote_abort);
617
goto done;
618
}
619
620
/*
621
* Wait synchronously for a call to complete.
622
*/
623
void afs_wait_for_call_to_complete(struct afs_call *call)
624
{
625
bool rxrpc_complete = false;
626
627
_enter("");
628
629
if (!afs_check_call_state(call, AFS_CALL_COMPLETE)) {
630
DECLARE_WAITQUEUE(myself, current);
631
632
add_wait_queue(&call->waitq, &myself);
633
for (;;) {
634
set_current_state(TASK_UNINTERRUPTIBLE);
635
636
/* deliver any messages that are in the queue */
637
if (!afs_check_call_state(call, AFS_CALL_COMPLETE) &&
638
call->need_attention) {
639
call->need_attention = false;
640
__set_current_state(TASK_RUNNING);
641
afs_deliver_to_call(call);
642
continue;
643
}
644
645
if (afs_check_call_state(call, AFS_CALL_COMPLETE))
646
break;
647
648
if (!rxrpc_kernel_check_life(call->net->socket, call->rxcall)) {
649
/* rxrpc terminated the call. */
650
rxrpc_complete = true;
651
break;
652
}
653
654
schedule();
655
}
656
657
remove_wait_queue(&call->waitq, &myself);
658
__set_current_state(TASK_RUNNING);
659
}
660
661
if (!afs_check_call_state(call, AFS_CALL_COMPLETE)) {
662
if (rxrpc_complete) {
663
afs_set_call_complete(call, call->error, call->abort_code);
664
} else {
665
/* Kill off the call if it's still live. */
666
_debug("call interrupted");
667
if (rxrpc_kernel_abort_call(call->net->socket, call->rxcall,
668
RX_USER_ABORT, -EINTR,
669
afs_abort_interrupted))
670
afs_set_call_complete(call, -EINTR, 0);
671
}
672
}
673
}
674
675
/*
676
* wake up a waiting call
677
*/
678
static void afs_wake_up_call_waiter(struct sock *sk, struct rxrpc_call *rxcall,
679
unsigned long call_user_ID)
680
{
681
struct afs_call *call = (struct afs_call *)call_user_ID;
682
683
call->need_attention = true;
684
wake_up(&call->waitq);
685
}
686
687
/*
688
* Wake up an asynchronous call. The caller is holding the call notify
689
* spinlock around this, so we can't call afs_put_call().
690
*/
691
static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
692
unsigned long call_user_ID)
693
{
694
struct afs_call *call = (struct afs_call *)call_user_ID;
695
int r;
696
697
trace_afs_notify_call(rxcall, call);
698
call->need_attention = true;
699
700
if (__refcount_inc_not_zero(&call->ref, &r)) {
701
trace_afs_call(call->debug_id, afs_call_trace_wake, r + 1,
702
atomic_read(&call->net->nr_outstanding_calls),
703
__builtin_return_address(0));
704
705
if (!queue_work(afs_async_calls, &call->async_work))
706
afs_deferred_put_call(call);
707
}
708
}
709
710
/*
711
* Perform I/O processing on an asynchronous call. The work item carries a ref
712
* to the call struct that we either need to release or to pass on.
713
*/
714
static void afs_process_async_call(struct work_struct *work)
715
{
716
struct afs_call *call = container_of(work, struct afs_call, async_work);
717
718
_enter("");
719
720
if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
721
call->need_attention = false;
722
afs_deliver_to_call(call);
723
}
724
725
afs_put_call(call);
726
_leave("");
727
}
728
729
static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
730
{
731
struct afs_call *call = (struct afs_call *)user_call_ID;
732
733
call->rxcall = rxcall;
734
}
735
736
/*
737
* Charge the incoming call preallocation.
738
*/
739
void afs_charge_preallocation(struct work_struct *work)
740
{
741
struct afs_net *net =
742
container_of(work, struct afs_net, charge_preallocation_work);
743
struct afs_call *call = net->spare_incoming_call;
744
745
for (;;) {
746
if (!call) {
747
call = afs_alloc_call(net, &afs_RXCMxxxx, GFP_KERNEL);
748
if (!call)
749
break;
750
751
call->drop_ref = true;
752
call->async = true;
753
call->state = AFS_CALL_SV_AWAIT_OP_ID;
754
init_waitqueue_head(&call->waitq);
755
afs_extract_to_tmp(call);
756
}
757
758
if (rxrpc_kernel_charge_accept(net->socket,
759
afs_wake_up_async_call,
760
(unsigned long)call,
761
GFP_KERNEL,
762
call->debug_id) < 0)
763
break;
764
call = NULL;
765
}
766
net->spare_incoming_call = call;
767
}
768
769
/*
770
* Discard a preallocated call when a socket is shut down.
771
*/
772
static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
773
unsigned long user_call_ID)
774
{
775
struct afs_call *call = (struct afs_call *)user_call_ID;
776
777
call->rxcall = NULL;
778
afs_put_call(call);
779
}
780
781
/*
782
* Notification of an incoming call.
783
*/
784
static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
785
unsigned long user_call_ID)
786
{
787
struct afs_call *call = (struct afs_call *)user_call_ID;
788
struct afs_net *net = afs_sock2net(sk);
789
790
call->peer = rxrpc_kernel_get_call_peer(sk->sk_socket, call->rxcall);
791
call->server = afs_find_server(call->peer);
792
if (!call->server)
793
trace_afs_cm_no_server(call, rxrpc_kernel_remote_srx(call->peer));
794
795
queue_work(afs_wq, &net->charge_preallocation_work);
796
}
797
798
/*
799
* Grab the operation ID from an incoming cache manager call. The socket
800
* buffer is discarded on error or if we don't yet have sufficient data.
801
*/
802
static int afs_deliver_cm_op_id(struct afs_call *call)
803
{
804
int ret;
805
806
_enter("{%zu}", iov_iter_count(call->iter));
807
808
/* the operation ID forms the first four bytes of the request data */
809
ret = afs_extract_data(call, true);
810
if (ret < 0)
811
return ret;
812
813
call->operation_ID = ntohl(call->tmp);
814
afs_set_call_state(call, AFS_CALL_SV_AWAIT_OP_ID, AFS_CALL_SV_AWAIT_REQUEST);
815
816
/* ask the cache manager to route the call (it'll change the call type
817
* if successful) */
818
if (!afs_cm_incoming_call(call))
819
return -ENOTSUPP;
820
821
call->security_ix = rxrpc_kernel_query_call_security(call->rxcall,
822
&call->service_id,
823
&call->enctype);
824
825
trace_afs_cb_call(call);
826
call->work.func = call->type->work;
827
828
/* pass responsibility for the remainder of this message off to the
829
* cache manager op */
830
return call->type->deliver(call);
831
}
832
833
/*
834
* Advance the AFS call state when an RxRPC service call ends the transmit
835
* phase.
836
*/
837
static void afs_notify_end_reply_tx(struct sock *sock,
838
struct rxrpc_call *rxcall,
839
unsigned long call_user_ID)
840
{
841
struct afs_call *call = (struct afs_call *)call_user_ID;
842
843
afs_set_call_state(call, AFS_CALL_SV_REPLYING, AFS_CALL_SV_AWAIT_ACK);
844
}
845
846
/*
847
* send an empty reply
848
*/
849
void afs_send_empty_reply(struct afs_call *call)
850
{
851
struct afs_net *net = call->net;
852
struct msghdr msg;
853
854
_enter("");
855
856
rxrpc_kernel_set_tx_length(net->socket, call->rxcall, 0);
857
858
msg.msg_name = NULL;
859
msg.msg_namelen = 0;
860
iov_iter_kvec(&msg.msg_iter, ITER_SOURCE, NULL, 0, 0);
861
msg.msg_control = NULL;
862
msg.msg_controllen = 0;
863
msg.msg_flags = 0;
864
865
switch (rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, 0,
866
afs_notify_end_reply_tx)) {
867
case 0:
868
_leave(" [replied]");
869
return;
870
871
case -ENOMEM:
872
_debug("oom");
873
rxrpc_kernel_abort_call(net->socket, call->rxcall,
874
RXGEN_SS_MARSHAL, -ENOMEM,
875
afs_abort_oom);
876
fallthrough;
877
default:
878
_leave(" [error]");
879
return;
880
}
881
}
882
883
/*
884
* send a simple reply
885
*/
886
void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
887
{
888
struct afs_net *net = call->net;
889
struct msghdr msg;
890
struct kvec iov[1];
891
int n;
892
893
_enter("");
894
895
rxrpc_kernel_set_tx_length(net->socket, call->rxcall, len);
896
897
iov[0].iov_base = (void *) buf;
898
iov[0].iov_len = len;
899
msg.msg_name = NULL;
900
msg.msg_namelen = 0;
901
iov_iter_kvec(&msg.msg_iter, ITER_SOURCE, iov, 1, len);
902
msg.msg_control = NULL;
903
msg.msg_controllen = 0;
904
msg.msg_flags = 0;
905
906
n = rxrpc_kernel_send_data(net->socket, call->rxcall, &msg, len,
907
afs_notify_end_reply_tx);
908
if (n >= 0) {
909
/* Success */
910
_leave(" [replied]");
911
return;
912
}
913
914
if (n == -ENOMEM) {
915
_debug("oom");
916
rxrpc_kernel_abort_call(net->socket, call->rxcall,
917
RXGEN_SS_MARSHAL, -ENOMEM,
918
afs_abort_oom);
919
}
920
_leave(" [error]");
921
}
922
923
/*
924
* Extract a piece of data from the received data socket buffers.
925
*/
926
int afs_extract_data(struct afs_call *call, bool want_more)
927
{
928
struct afs_net *net = call->net;
929
struct iov_iter *iter = call->iter;
930
enum afs_call_state state;
931
u32 remote_abort = 0;
932
int ret;
933
934
_enter("{%s,%zu,%zu},%d",
935
call->type->name, call->iov_len, iov_iter_count(iter), want_more);
936
937
ret = rxrpc_kernel_recv_data(net->socket, call->rxcall, iter,
938
&call->iov_len, want_more, &remote_abort,
939
&call->service_id);
940
trace_afs_receive_data(call, call->iter, want_more, ret);
941
if (ret == 0 || ret == -EAGAIN)
942
return ret;
943
944
state = READ_ONCE(call->state);
945
if (ret == 1) {
946
switch (state) {
947
case AFS_CALL_CL_AWAIT_REPLY:
948
afs_set_call_state(call, state, AFS_CALL_CL_PROC_REPLY);
949
break;
950
case AFS_CALL_SV_AWAIT_REQUEST:
951
afs_set_call_state(call, state, AFS_CALL_SV_REPLYING);
952
break;
953
case AFS_CALL_COMPLETE:
954
kdebug("prem complete %d", call->error);
955
return afs_io_error(call, afs_io_error_extract);
956
default:
957
break;
958
}
959
return 0;
960
}
961
962
afs_set_call_complete(call, ret, remote_abort);
963
return ret;
964
}
965
966
/*
967
* Log protocol error production.
968
*/
969
noinline int afs_protocol_error(struct afs_call *call,
970
enum afs_eproto_cause cause)
971
{
972
trace_afs_protocol_error(call, cause);
973
if (call)
974
call->unmarshalling_error = true;
975
return -EBADMSG;
976
}
977
978
/*
979
* Wake up OOB notification processing.
980
*/
981
static void afs_rx_notify_oob(struct sock *sk, struct sk_buff *oob)
982
{
983
struct afs_net *net = sk->sk_user_data;
984
985
schedule_work(&net->rx_oob_work);
986
}
987
988