Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/sys/rpc/clnt_dg.c
39564 views
1
/* $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */
2
3
/*-
4
* SPDX-License-Identifier: BSD-3-Clause
5
*
6
* Copyright (c) 2009, Sun Microsystems, Inc.
7
* All rights reserved.
8
*
9
* Redistribution and use in source and binary forms, with or without
10
* modification, are permitted provided that the following conditions are met:
11
* - Redistributions of source code must retain the above copyright notice,
12
* this list of conditions and the following disclaimer.
13
* - Redistributions in binary form must reproduce the above copyright notice,
14
* this list of conditions and the following disclaimer in the documentation
15
* and/or other materials provided with the distribution.
16
* - Neither the name of Sun Microsystems, Inc. nor the names of its
17
* contributors may be used to endorse or promote products derived
18
* from this software without specific prior written permission.
19
*
20
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
* POSSIBILITY OF SUCH DAMAGE.
31
*/
32
/*
33
* Copyright (c) 1986-1991 by Sun Microsystems Inc.
34
*/
35
36
/*
37
* Implements a connectionless client side RPC.
38
*/
39
40
#include <sys/param.h>
41
#include <sys/systm.h>
42
#include <sys/kernel.h>
43
#include <sys/lock.h>
44
#include <sys/malloc.h>
45
#include <sys/mbuf.h>
46
#include <sys/mutex.h>
47
#include <sys/pcpu.h>
48
#include <sys/proc.h>
49
#include <sys/socket.h>
50
#include <sys/socketvar.h>
51
#include <sys/time.h>
52
#include <sys/uio.h>
53
54
#include <net/vnet.h>
55
56
#include <rpc/rpc.h>
57
#include <rpc/rpc_com.h>
58
59
60
#ifdef _FREEFALL_CONFIG
61
/*
62
* Disable RPC exponential back-off for FreeBSD.org systems.
63
*/
64
#define RPC_MAX_BACKOFF 1 /* second */
65
#else
66
#define RPC_MAX_BACKOFF 30 /* seconds */
67
#endif
68
69
static bool_t time_not_ok(struct timeval *);
70
static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *,
71
rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
72
static void clnt_dg_geterr(CLIENT *, struct rpc_err *);
73
static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *);
74
static void clnt_dg_abort(CLIENT *);
75
static bool_t clnt_dg_control(CLIENT *, u_int, void *);
76
static void clnt_dg_close(CLIENT *);
77
static void clnt_dg_destroy(CLIENT *);
78
static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag);
79
80
static const struct clnt_ops clnt_dg_ops = {
81
.cl_call = clnt_dg_call,
82
.cl_abort = clnt_dg_abort,
83
.cl_geterr = clnt_dg_geterr,
84
.cl_freeres = clnt_dg_freeres,
85
.cl_close = clnt_dg_close,
86
.cl_destroy = clnt_dg_destroy,
87
.cl_control = clnt_dg_control
88
};
89
90
static volatile uint32_t rpc_xid = 0;
91
92
/*
93
* A pending RPC request which awaits a reply. Requests which have
94
* received their reply will have cr_xid set to zero and cr_mrep to
95
* the mbuf chain of the reply.
96
*/
97
struct cu_request {
98
TAILQ_ENTRY(cu_request) cr_link;
99
CLIENT *cr_client; /* owner */
100
uint32_t cr_xid; /* XID of request */
101
struct mbuf *cr_mrep; /* reply received by upcall */
102
int cr_error; /* any error from upcall */
103
char cr_verf[MAX_AUTH_BYTES]; /* reply verf */
104
};
105
106
TAILQ_HEAD(cu_request_list, cu_request);
107
108
#define MCALL_MSG_SIZE 24
109
110
/*
111
* This structure is pointed to by the socket buffer's sb_upcallarg
112
* member. It is separate from the client private data to facilitate
113
* multiple clients sharing the same socket. The cs_lock mutex is used
114
* to protect all fields of this structure, the socket's receive
115
* buffer lock is used to ensure that exactly one of these
116
* structures is installed on the socket.
117
*/
118
struct cu_socket {
119
struct mtx cs_lock;
120
int cs_refs; /* Count of clients */
121
struct cu_request_list cs_pending; /* Requests awaiting replies */
122
int cs_upcallrefs; /* Refcnt of upcalls in prog.*/
123
};
124
125
static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *);
126
127
/*
128
* Private data kept per client handle
129
*/
130
struct cu_data {
131
int cu_threads; /* # threads in clnt_vc_call */
132
bool_t cu_closing; /* TRUE if we are closing */
133
bool_t cu_closed; /* TRUE if we are closed */
134
struct socket *cu_socket; /* connection socket */
135
bool_t cu_closeit; /* opened by library */
136
struct sockaddr_storage cu_raddr; /* remote address */
137
int cu_rlen;
138
struct timeval cu_wait; /* retransmit interval */
139
struct timeval cu_total; /* total time for the call */
140
struct rpc_err cu_error;
141
uint32_t cu_xid;
142
char cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
143
size_t cu_mcalllen;
144
size_t cu_sendsz; /* send size */
145
size_t cu_recvsz; /* recv size */
146
int cu_async;
147
int cu_connect; /* Use connect(). */
148
int cu_connected; /* Have done connect(). */
149
const char *cu_waitchan;
150
int cu_waitflag;
151
int cu_cwnd; /* congestion window */
152
int cu_sent; /* number of in-flight RPCs */
153
bool_t cu_cwnd_wait;
154
};
155
156
#define CWNDSCALE 256
157
#define MAXCWND (32 * CWNDSCALE)
158
159
/*
160
* Connection less client creation returns with client handle parameters.
161
* Default options are set, which the user can change using clnt_control().
162
* fd should be open and bound.
163
* NB: The rpch->cl_auth is initialized to null authentication.
164
* Caller may wish to set this something more useful.
165
*
166
* sendsz and recvsz are the maximum allowable packet sizes that can be
167
* sent and received. Normally they are the same, but they can be
168
* changed to improve the program efficiency and buffer allocation.
169
* If they are 0, use the transport default.
170
*
171
* If svcaddr is NULL, returns NULL.
172
*/
173
CLIENT *
174
clnt_dg_create(
175
struct socket *so,
176
struct sockaddr *svcaddr, /* servers address */
177
rpcprog_t program, /* program number */
178
rpcvers_t version, /* version number */
179
size_t sendsz, /* buffer recv size */
180
size_t recvsz) /* buffer send size */
181
{
182
CLIENT *cl = NULL; /* client handle */
183
struct cu_data *cu = NULL; /* private data */
184
struct cu_socket *cs = NULL;
185
struct sockbuf *sb;
186
struct timeval now;
187
struct rpc_msg call_msg;
188
struct __rpc_sockinfo si;
189
XDR xdrs;
190
int error;
191
uint32_t newxid;
192
193
if (svcaddr == NULL) {
194
rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
195
return (NULL);
196
}
197
198
if (!__rpc_socket2sockinfo(so, &si)) {
199
rpc_createerr.cf_stat = RPC_TLIERROR;
200
rpc_createerr.cf_error.re_errno = 0;
201
return (NULL);
202
}
203
204
/*
205
* Find the receive and the send size
206
*/
207
sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
208
recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
209
if ((sendsz == 0) || (recvsz == 0)) {
210
rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */
211
rpc_createerr.cf_error.re_errno = 0;
212
return (NULL);
213
}
214
215
cl = mem_alloc(sizeof (CLIENT));
216
217
/*
218
* Should be multiple of 4 for XDR.
219
*/
220
sendsz = rounddown(sendsz + 3, 4);
221
recvsz = rounddown(recvsz + 3, 4);
222
cu = mem_alloc(sizeof (*cu));
223
cu->cu_threads = 0;
224
cu->cu_closing = FALSE;
225
cu->cu_closed = FALSE;
226
(void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len);
227
cu->cu_rlen = svcaddr->sa_len;
228
/* Other values can also be set through clnt_control() */
229
cu->cu_wait.tv_sec = 3; /* heuristically chosen */
230
cu->cu_wait.tv_usec = 0;
231
cu->cu_total.tv_sec = -1;
232
cu->cu_total.tv_usec = -1;
233
cu->cu_sendsz = sendsz;
234
cu->cu_recvsz = recvsz;
235
cu->cu_async = FALSE;
236
cu->cu_connect = FALSE;
237
cu->cu_connected = FALSE;
238
cu->cu_waitchan = "rpcrecv";
239
cu->cu_waitflag = 0;
240
cu->cu_cwnd = MAXCWND / 2;
241
cu->cu_sent = 0;
242
cu->cu_cwnd_wait = FALSE;
243
(void) getmicrotime(&now);
244
/* Clip at 28bits so that it will not wrap around. */
245
newxid = __RPC_GETXID(&now) & 0xfffffff;
246
atomic_cmpset_32(&rpc_xid, 0, newxid);
247
call_msg.rm_xid = atomic_fetchadd_32(&rpc_xid, 1);
248
call_msg.rm_call.cb_prog = program;
249
call_msg.rm_call.cb_vers = version;
250
xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE);
251
if (! xdr_callhdr(&xdrs, &call_msg)) {
252
rpc_createerr.cf_stat = RPC_CANTENCODEARGS; /* XXX */
253
rpc_createerr.cf_error.re_errno = 0;
254
goto err2;
255
}
256
cu->cu_mcalllen = XDR_GETPOS(&xdrs);
257
258
/*
259
* By default, closeit is always FALSE. It is users responsibility
260
* to do a close on it, else the user may use clnt_control
261
* to let clnt_destroy do it for him/her.
262
*/
263
cu->cu_closeit = FALSE;
264
cu->cu_socket = so;
265
error = soreserve(so, (u_long)sendsz, (u_long)recvsz);
266
if (error != 0) {
267
rpc_createerr.cf_stat = RPC_FAILED;
268
rpc_createerr.cf_error.re_errno = error;
269
goto err2;
270
}
271
272
sb = &so->so_rcv;
273
SOCK_RECVBUF_LOCK(so);
274
recheck_socket:
275
if (sb->sb_upcall) {
276
if (sb->sb_upcall != clnt_dg_soupcall) {
277
SOCK_RECVBUF_UNLOCK(so);
278
printf("clnt_dg_create(): socket already has an incompatible upcall\n");
279
goto err2;
280
}
281
cs = (struct cu_socket *) sb->sb_upcallarg;
282
mtx_lock(&cs->cs_lock);
283
cs->cs_refs++;
284
mtx_unlock(&cs->cs_lock);
285
} else {
286
/*
287
* We are the first on this socket - allocate the
288
* structure and install it in the socket.
289
*/
290
SOCK_RECVBUF_UNLOCK(so);
291
cs = mem_alloc(sizeof(*cs));
292
SOCK_RECVBUF_LOCK(so);
293
if (sb->sb_upcall) {
294
/*
295
* We have lost a race with some other client.
296
*/
297
mem_free(cs, sizeof(*cs));
298
goto recheck_socket;
299
}
300
mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF);
301
cs->cs_refs = 1;
302
cs->cs_upcallrefs = 0;
303
TAILQ_INIT(&cs->cs_pending);
304
soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs);
305
}
306
SOCK_RECVBUF_UNLOCK(so);
307
308
cl->cl_refs = 1;
309
cl->cl_ops = &clnt_dg_ops;
310
cl->cl_private = (caddr_t)(void *)cu;
311
cl->cl_auth = authnone_create();
312
cl->cl_tp = NULL;
313
cl->cl_netid = NULL;
314
return (cl);
315
err2:
316
mem_free(cl, sizeof (CLIENT));
317
mem_free(cu, sizeof (*cu));
318
319
return (NULL);
320
}
321
322
static enum clnt_stat
323
clnt_dg_call(
324
CLIENT *cl, /* client handle */
325
struct rpc_callextra *ext, /* call metadata */
326
rpcproc_t proc, /* procedure number */
327
struct mbuf *args, /* pointer to args */
328
struct mbuf **resultsp, /* pointer to results */
329
struct timeval utimeout) /* seconds to wait before giving up */
330
{
331
struct cu_data *cu = (struct cu_data *)cl->cl_private;
332
struct cu_socket *cs;
333
struct rpc_timers *rt;
334
AUTH *auth;
335
struct rpc_err *errp;
336
enum clnt_stat stat;
337
XDR xdrs;
338
struct rpc_msg reply_msg;
339
bool_t ok;
340
int retrans; /* number of re-transmits so far */
341
int nrefreshes = 2; /* number of times to refresh cred */
342
struct timeval *tvp;
343
int timeout;
344
int retransmit_time;
345
int next_sendtime, starttime, rtt, time_waited, tv = 0;
346
struct sockaddr *sa;
347
uint32_t xid = 0;
348
struct mbuf *mreq = NULL, *results;
349
struct cu_request *cr;
350
int error;
351
352
cs = cu->cu_socket->so_rcv.sb_upcallarg;
353
cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK);
354
355
mtx_lock(&cs->cs_lock);
356
357
if (cu->cu_closing || cu->cu_closed) {
358
mtx_unlock(&cs->cs_lock);
359
free(cr, M_RPC);
360
return (RPC_CANTSEND);
361
}
362
cu->cu_threads++;
363
364
if (ext) {
365
auth = ext->rc_auth;
366
errp = &ext->rc_err;
367
} else {
368
auth = cl->cl_auth;
369
errp = &cu->cu_error;
370
}
371
372
cr->cr_client = cl;
373
cr->cr_mrep = NULL;
374
cr->cr_error = 0;
375
376
if (cu->cu_total.tv_usec == -1) {
377
tvp = &utimeout; /* use supplied timeout */
378
} else {
379
tvp = &cu->cu_total; /* use default timeout */
380
}
381
if (tvp->tv_sec || tvp->tv_usec)
382
timeout = tvtohz(tvp);
383
else
384
timeout = 0;
385
386
if (cu->cu_connect && !cu->cu_connected) {
387
mtx_unlock(&cs->cs_lock);
388
error = soconnect(cu->cu_socket,
389
(struct sockaddr *)&cu->cu_raddr, curthread);
390
mtx_lock(&cs->cs_lock);
391
if (error) {
392
errp->re_errno = error;
393
errp->re_status = stat = RPC_CANTSEND;
394
goto out;
395
}
396
cu->cu_connected = 1;
397
}
398
if (cu->cu_connected)
399
sa = NULL;
400
else
401
sa = (struct sockaddr *)&cu->cu_raddr;
402
time_waited = 0;
403
retrans = 0;
404
if (ext && ext->rc_timers) {
405
rt = ext->rc_timers;
406
if (!rt->rt_rtxcur)
407
rt->rt_rtxcur = tvtohz(&cu->cu_wait);
408
retransmit_time = next_sendtime = rt->rt_rtxcur;
409
} else {
410
rt = NULL;
411
retransmit_time = next_sendtime = tvtohz(&cu->cu_wait);
412
}
413
414
starttime = ticks;
415
416
call_again:
417
mtx_assert(&cs->cs_lock, MA_OWNED);
418
419
xid = atomic_fetchadd_32(&rpc_xid, 1);
420
421
send_again:
422
mtx_unlock(&cs->cs_lock);
423
424
mreq = m_gethdr(M_WAITOK, MT_DATA);
425
KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big"));
426
bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen);
427
mreq->m_len = cu->cu_mcalllen;
428
429
/*
430
* The XID is the first thing in the request.
431
*/
432
*mtod(mreq, uint32_t *) = htonl(xid);
433
434
xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
435
436
if (cu->cu_async == TRUE && args == NULL)
437
goto get_reply;
438
439
if ((! XDR_PUTINT32(&xdrs, &proc)) ||
440
(! AUTH_MARSHALL(auth, xid, &xdrs,
441
m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
442
errp->re_status = stat = RPC_CANTENCODEARGS;
443
mtx_lock(&cs->cs_lock);
444
goto out;
445
}
446
mreq->m_pkthdr.len = m_length(mreq, NULL);
447
448
cr->cr_xid = xid;
449
mtx_lock(&cs->cs_lock);
450
451
/*
452
* Try to get a place in the congestion window.
453
*/
454
while (cu->cu_sent >= cu->cu_cwnd) {
455
cu->cu_cwnd_wait = TRUE;
456
error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock,
457
cu->cu_waitflag, "rpccwnd", 0);
458
if (error) {
459
errp->re_errno = error;
460
if (error == EINTR || error == ERESTART)
461
errp->re_status = stat = RPC_INTR;
462
else
463
errp->re_status = stat = RPC_CANTSEND;
464
goto out;
465
}
466
}
467
cu->cu_sent += CWNDSCALE;
468
469
TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
470
mtx_unlock(&cs->cs_lock);
471
472
/*
473
* sosend consumes mreq.
474
*/
475
error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread);
476
mreq = NULL;
477
478
/*
479
* sub-optimal code appears here because we have
480
* some clock time to spare while the packets are in flight.
481
* (We assume that this is actually only executed once.)
482
*/
483
reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
484
reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
485
reply_msg.acpted_rply.ar_verf.oa_length = 0;
486
reply_msg.acpted_rply.ar_results.where = NULL;
487
reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
488
489
mtx_lock(&cs->cs_lock);
490
if (error) {
491
TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
492
errp->re_errno = error;
493
errp->re_status = stat = RPC_CANTSEND;
494
cu->cu_sent -= CWNDSCALE;
495
if (cu->cu_cwnd_wait) {
496
cu->cu_cwnd_wait = FALSE;
497
wakeup(&cu->cu_cwnd_wait);
498
}
499
goto out;
500
}
501
502
/*
503
* Check to see if we got an upcall while waiting for the
504
* lock.
505
*/
506
if (cr->cr_error) {
507
TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
508
errp->re_errno = cr->cr_error;
509
errp->re_status = stat = RPC_CANTRECV;
510
cu->cu_sent -= CWNDSCALE;
511
if (cu->cu_cwnd_wait) {
512
cu->cu_cwnd_wait = FALSE;
513
wakeup(&cu->cu_cwnd_wait);
514
}
515
goto out;
516
}
517
if (cr->cr_mrep) {
518
TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
519
cu->cu_sent -= CWNDSCALE;
520
if (cu->cu_cwnd_wait) {
521
cu->cu_cwnd_wait = FALSE;
522
wakeup(&cu->cu_cwnd_wait);
523
}
524
goto got_reply;
525
}
526
527
/*
528
* Hack to provide rpc-based message passing
529
*/
530
if (timeout == 0) {
531
TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
532
errp->re_status = stat = RPC_TIMEDOUT;
533
cu->cu_sent -= CWNDSCALE;
534
if (cu->cu_cwnd_wait) {
535
cu->cu_cwnd_wait = FALSE;
536
wakeup(&cu->cu_cwnd_wait);
537
}
538
goto out;
539
}
540
541
get_reply:
542
for (;;) {
543
/* Decide how long to wait. */
544
if (next_sendtime < timeout)
545
tv = next_sendtime;
546
else
547
tv = timeout;
548
tv -= time_waited;
549
550
if (tv > 0) {
551
if (cu->cu_closing || cu->cu_closed) {
552
error = 0;
553
cr->cr_error = ESHUTDOWN;
554
} else {
555
error = msleep(cr, &cs->cs_lock,
556
cu->cu_waitflag, cu->cu_waitchan, tv);
557
}
558
} else {
559
error = EWOULDBLOCK;
560
}
561
562
TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
563
cu->cu_sent -= CWNDSCALE;
564
if (cu->cu_cwnd_wait) {
565
cu->cu_cwnd_wait = FALSE;
566
wakeup(&cu->cu_cwnd_wait);
567
}
568
569
if (!error) {
570
/*
571
* We were woken up by the upcall. If the
572
* upcall had a receive error, report that,
573
* otherwise we have a reply.
574
*/
575
if (cr->cr_error) {
576
errp->re_errno = cr->cr_error;
577
errp->re_status = stat = RPC_CANTRECV;
578
goto out;
579
}
580
581
cu->cu_cwnd += (CWNDSCALE * CWNDSCALE
582
+ cu->cu_cwnd / 2) / cu->cu_cwnd;
583
if (cu->cu_cwnd > MAXCWND)
584
cu->cu_cwnd = MAXCWND;
585
586
if (rt) {
587
/*
588
* Add one to the time since a tick
589
* count of N means that the actual
590
* time taken was somewhere between N
591
* and N+1.
592
*/
593
rtt = ticks - starttime + 1;
594
595
/*
596
* Update our estimate of the round
597
* trip time using roughly the
598
* algorithm described in RFC
599
* 2988. Given an RTT sample R:
600
*
601
* RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R|
602
* SRTT = (1-alpha) * SRTT + alpha * R
603
*
604
* where alpha = 0.125 and beta = 0.25.
605
*
606
* The initial retransmit timeout is
607
* SRTT + 4*RTTVAR and doubles on each
608
* retransmision.
609
*/
610
if (rt->rt_srtt == 0) {
611
rt->rt_srtt = rtt;
612
rt->rt_deviate = rtt / 2;
613
} else {
614
int32_t error = rtt - rt->rt_srtt;
615
rt->rt_srtt += error / 8;
616
error = abs(error) - rt->rt_deviate;
617
rt->rt_deviate += error / 4;
618
}
619
rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate;
620
}
621
622
break;
623
}
624
625
/*
626
* The sleep returned an error so our request is still
627
* on the list. If we got EWOULDBLOCK, we may want to
628
* re-send the request.
629
*/
630
if (error != EWOULDBLOCK) {
631
errp->re_errno = error;
632
if (error == EINTR || error == ERESTART)
633
errp->re_status = stat = RPC_INTR;
634
else
635
errp->re_status = stat = RPC_CANTRECV;
636
goto out;
637
}
638
639
time_waited = ticks - starttime;
640
641
/* Check for timeout. */
642
if (time_waited > timeout) {
643
errp->re_errno = EWOULDBLOCK;
644
errp->re_status = stat = RPC_TIMEDOUT;
645
goto out;
646
}
647
648
/* Retransmit if necessary. */
649
if (time_waited >= next_sendtime) {
650
cu->cu_cwnd /= 2;
651
if (cu->cu_cwnd < CWNDSCALE)
652
cu->cu_cwnd = CWNDSCALE;
653
if (ext && ext->rc_feedback) {
654
mtx_unlock(&cs->cs_lock);
655
if (retrans == 0)
656
ext->rc_feedback(FEEDBACK_REXMIT1,
657
proc, ext->rc_feedback_arg);
658
else
659
ext->rc_feedback(FEEDBACK_REXMIT2,
660
proc, ext->rc_feedback_arg);
661
mtx_lock(&cs->cs_lock);
662
}
663
if (cu->cu_closing || cu->cu_closed) {
664
errp->re_errno = ESHUTDOWN;
665
errp->re_status = stat = RPC_CANTRECV;
666
goto out;
667
}
668
retrans++;
669
/* update retransmit_time */
670
if (retransmit_time < RPC_MAX_BACKOFF * hz)
671
retransmit_time = 2 * retransmit_time;
672
next_sendtime += retransmit_time;
673
goto send_again;
674
}
675
cu->cu_sent += CWNDSCALE;
676
TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
677
}
678
679
got_reply:
680
/*
681
* Now decode and validate the response. We need to drop the
682
* lock since xdr_replymsg may end up sleeping in malloc.
683
*/
684
mtx_unlock(&cs->cs_lock);
685
686
if (ext && ext->rc_feedback)
687
ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
688
689
xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
690
ok = xdr_replymsg(&xdrs, &reply_msg);
691
cr->cr_mrep = NULL;
692
693
if (ok) {
694
if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
695
(reply_msg.acpted_rply.ar_stat == SUCCESS))
696
errp->re_status = stat = RPC_SUCCESS;
697
else
698
stat = _seterr_reply(&reply_msg, &(cu->cu_error));
699
700
if (errp->re_status == RPC_SUCCESS) {
701
results = xdrmbuf_getall(&xdrs);
702
if (! AUTH_VALIDATE(auth, xid,
703
&reply_msg.acpted_rply.ar_verf,
704
&results)) {
705
errp->re_status = stat = RPC_AUTHERROR;
706
errp->re_why = AUTH_INVALIDRESP;
707
if (retrans &&
708
auth->ah_cred.oa_flavor == RPCSEC_GSS) {
709
/*
710
* If we retransmitted, its
711
* possible that we will
712
* receive a reply for one of
713
* the earlier transmissions
714
* (which will use an older
715
* RPCSEC_GSS sequence
716
* number). In this case, just
717
* go back and listen for a
718
* new reply. We could keep a
719
* record of all the seq
720
* numbers we have transmitted
721
* so far so that we could
722
* accept a reply for any of
723
* them here.
724
*/
725
XDR_DESTROY(&xdrs);
726
mtx_lock(&cs->cs_lock);
727
cu->cu_sent += CWNDSCALE;
728
TAILQ_INSERT_TAIL(&cs->cs_pending,
729
cr, cr_link);
730
cr->cr_mrep = NULL;
731
goto get_reply;
732
}
733
} else {
734
*resultsp = results;
735
}
736
} /* end successful completion */
737
/*
738
* If unsuccessful AND error is an authentication error
739
* then refresh credentials and try again, else break
740
*/
741
else if (stat == RPC_AUTHERROR)
742
/* maybe our credentials need to be refreshed ... */
743
if (nrefreshes > 0 &&
744
AUTH_REFRESH(auth, &reply_msg)) {
745
nrefreshes--;
746
XDR_DESTROY(&xdrs);
747
mtx_lock(&cs->cs_lock);
748
goto call_again;
749
}
750
/* end of unsuccessful completion */
751
} /* end of valid reply message */
752
else {
753
errp->re_status = stat = RPC_CANTDECODERES;
754
755
}
756
XDR_DESTROY(&xdrs);
757
mtx_lock(&cs->cs_lock);
758
out:
759
mtx_assert(&cs->cs_lock, MA_OWNED);
760
761
if (mreq)
762
m_freem(mreq);
763
if (cr->cr_mrep)
764
m_freem(cr->cr_mrep);
765
766
cu->cu_threads--;
767
if (cu->cu_closing)
768
wakeup(cu);
769
770
mtx_unlock(&cs->cs_lock);
771
772
if (auth && stat != RPC_SUCCESS)
773
AUTH_VALIDATE(auth, xid, NULL, NULL);
774
775
free(cr, M_RPC);
776
777
return (stat);
778
}
779
780
static void
781
clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp)
782
{
783
struct cu_data *cu = (struct cu_data *)cl->cl_private;
784
785
*errp = cu->cu_error;
786
}
787
788
static bool_t
789
clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
790
{
791
XDR xdrs;
792
bool_t dummy;
793
794
xdrs.x_op = XDR_FREE;
795
dummy = (*xdr_res)(&xdrs, res_ptr);
796
797
return (dummy);
798
}
799
800
/*ARGSUSED*/
801
static void
802
clnt_dg_abort(CLIENT *h)
803
{
804
}
805
806
static bool_t
807
clnt_dg_control(CLIENT *cl, u_int request, void *info)
808
{
809
struct cu_data *cu = (struct cu_data *)cl->cl_private;
810
struct cu_socket *cs;
811
struct sockaddr *addr;
812
813
cs = cu->cu_socket->so_rcv.sb_upcallarg;
814
mtx_lock(&cs->cs_lock);
815
816
switch (request) {
817
case CLSET_FD_CLOSE:
818
cu->cu_closeit = TRUE;
819
mtx_unlock(&cs->cs_lock);
820
return (TRUE);
821
case CLSET_FD_NCLOSE:
822
cu->cu_closeit = FALSE;
823
mtx_unlock(&cs->cs_lock);
824
return (TRUE);
825
}
826
827
/* for other requests which use info */
828
if (info == NULL) {
829
mtx_unlock(&cs->cs_lock);
830
return (FALSE);
831
}
832
switch (request) {
833
case CLSET_TIMEOUT:
834
if (time_not_ok((struct timeval *)info)) {
835
mtx_unlock(&cs->cs_lock);
836
return (FALSE);
837
}
838
cu->cu_total = *(struct timeval *)info;
839
break;
840
case CLGET_TIMEOUT:
841
*(struct timeval *)info = cu->cu_total;
842
break;
843
case CLSET_RETRY_TIMEOUT:
844
if (time_not_ok((struct timeval *)info)) {
845
mtx_unlock(&cs->cs_lock);
846
return (FALSE);
847
}
848
cu->cu_wait = *(struct timeval *)info;
849
break;
850
case CLGET_RETRY_TIMEOUT:
851
*(struct timeval *)info = cu->cu_wait;
852
break;
853
case CLGET_SVC_ADDR:
854
/*
855
* Slightly different semantics to userland - we use
856
* sockaddr instead of netbuf.
857
*/
858
memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len);
859
break;
860
case CLSET_SVC_ADDR: /* set to new address */
861
addr = (struct sockaddr *)info;
862
(void) memcpy(&cu->cu_raddr, addr, addr->sa_len);
863
break;
864
case CLGET_XID:
865
*(uint32_t *)info = atomic_load_32(&rpc_xid);
866
break;
867
868
case CLSET_XID:
869
/* This will set the xid of the NEXT call */
870
/* decrement by 1 as clnt_dg_call() increments once */
871
atomic_store_32(&rpc_xid, *(uint32_t *)info - 1);
872
break;
873
874
case CLGET_VERS:
875
/*
876
* This RELIES on the information that, in the call body,
877
* the version number field is the fifth field from the
878
* beginning of the RPC header. MUST be changed if the
879
* call_struct is changed
880
*/
881
*(uint32_t *)info =
882
ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
883
4 * BYTES_PER_XDR_UNIT));
884
break;
885
886
case CLSET_VERS:
887
*(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT)
888
= htonl(*(uint32_t *)info);
889
break;
890
891
case CLGET_PROG:
892
/*
893
* This RELIES on the information that, in the call body,
894
* the program number field is the fourth field from the
895
* beginning of the RPC header. MUST be changed if the
896
* call_struct is changed
897
*/
898
*(uint32_t *)info =
899
ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
900
3 * BYTES_PER_XDR_UNIT));
901
break;
902
903
case CLSET_PROG:
904
*(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT)
905
= htonl(*(uint32_t *)info);
906
break;
907
case CLSET_ASYNC:
908
cu->cu_async = *(int *)info;
909
break;
910
case CLSET_CONNECT:
911
cu->cu_connect = *(int *)info;
912
break;
913
case CLSET_WAITCHAN:
914
cu->cu_waitchan = (const char *)info;
915
break;
916
case CLGET_WAITCHAN:
917
*(const char **) info = cu->cu_waitchan;
918
break;
919
case CLSET_INTERRUPTIBLE:
920
if (*(int *) info)
921
cu->cu_waitflag = PCATCH;
922
else
923
cu->cu_waitflag = 0;
924
break;
925
case CLGET_INTERRUPTIBLE:
926
if (cu->cu_waitflag)
927
*(int *) info = TRUE;
928
else
929
*(int *) info = FALSE;
930
break;
931
default:
932
mtx_unlock(&cs->cs_lock);
933
return (FALSE);
934
}
935
mtx_unlock(&cs->cs_lock);
936
return (TRUE);
937
}
938
939
static void
940
clnt_dg_close(CLIENT *cl)
941
{
942
struct cu_data *cu = (struct cu_data *)cl->cl_private;
943
struct cu_socket *cs;
944
struct cu_request *cr;
945
946
cs = cu->cu_socket->so_rcv.sb_upcallarg;
947
mtx_lock(&cs->cs_lock);
948
949
if (cu->cu_closed) {
950
mtx_unlock(&cs->cs_lock);
951
return;
952
}
953
954
if (cu->cu_closing) {
955
while (cu->cu_closing)
956
msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
957
KASSERT(cu->cu_closed, ("client should be closed"));
958
mtx_unlock(&cs->cs_lock);
959
return;
960
}
961
962
/*
963
* Abort any pending requests and wait until everyone
964
* has finished with clnt_vc_call.
965
*/
966
cu->cu_closing = TRUE;
967
TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
968
if (cr->cr_client == cl) {
969
cr->cr_xid = 0;
970
cr->cr_error = ESHUTDOWN;
971
wakeup(cr);
972
}
973
}
974
975
while (cu->cu_threads)
976
msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
977
978
cu->cu_closing = FALSE;
979
cu->cu_closed = TRUE;
980
981
mtx_unlock(&cs->cs_lock);
982
wakeup(cu);
983
}
984
985
static void
986
clnt_dg_destroy(CLIENT *cl)
987
{
988
struct cu_data *cu = (struct cu_data *)cl->cl_private;
989
struct cu_socket *cs;
990
struct socket *so = NULL;
991
bool_t lastsocketref;
992
993
cs = cu->cu_socket->so_rcv.sb_upcallarg;
994
clnt_dg_close(cl);
995
996
SOCK_RECVBUF_LOCK(cu->cu_socket);
997
mtx_lock(&cs->cs_lock);
998
999
cs->cs_refs--;
1000
if (cs->cs_refs == 0) {
1001
mtx_unlock(&cs->cs_lock);
1002
soupcall_clear(cu->cu_socket, SO_RCV);
1003
clnt_dg_upcallsdone(cu->cu_socket, cs);
1004
SOCK_RECVBUF_UNLOCK(cu->cu_socket);
1005
mtx_destroy(&cs->cs_lock);
1006
mem_free(cs, sizeof(*cs));
1007
lastsocketref = TRUE;
1008
} else {
1009
mtx_unlock(&cs->cs_lock);
1010
SOCK_RECVBUF_UNLOCK(cu->cu_socket);
1011
lastsocketref = FALSE;
1012
}
1013
1014
if (cu->cu_closeit && lastsocketref) {
1015
so = cu->cu_socket;
1016
cu->cu_socket = NULL;
1017
}
1018
1019
if (so)
1020
soclose(so);
1021
1022
if (cl->cl_netid && cl->cl_netid[0])
1023
mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
1024
if (cl->cl_tp && cl->cl_tp[0])
1025
mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
1026
mem_free(cu, sizeof (*cu));
1027
mem_free(cl, sizeof (CLIENT));
1028
}
1029
1030
/*
1031
* Make sure that the time is not garbage. -1 value is allowed.
1032
*/
1033
static bool_t
1034
time_not_ok(struct timeval *t)
1035
{
1036
return (t->tv_sec < -1 || t->tv_sec > 100000000 ||
1037
t->tv_usec < -1 || t->tv_usec > 1000000);
1038
}
1039
1040
int
1041
clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
1042
{
1043
struct cu_socket *cs = (struct cu_socket *) arg;
1044
struct uio uio;
1045
struct mbuf *m;
1046
struct mbuf *control;
1047
struct cu_request *cr;
1048
int error, rcvflag, foundreq;
1049
uint32_t xid;
1050
1051
cs->cs_upcallrefs++;
1052
uio.uio_resid = 1000000000;
1053
uio.uio_td = curthread;
1054
do {
1055
SOCK_RECVBUF_UNLOCK(so);
1056
m = NULL;
1057
control = NULL;
1058
rcvflag = MSG_DONTWAIT;
1059
error = soreceive(so, NULL, &uio, &m, &control, &rcvflag);
1060
if (control)
1061
m_freem(control);
1062
SOCK_RECVBUF_LOCK(so);
1063
1064
if (error == EWOULDBLOCK)
1065
break;
1066
1067
/*
1068
* If there was an error, wake up all pending
1069
* requests.
1070
*/
1071
if (error) {
1072
mtx_lock(&cs->cs_lock);
1073
TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
1074
cr->cr_xid = 0;
1075
cr->cr_error = error;
1076
wakeup(cr);
1077
}
1078
mtx_unlock(&cs->cs_lock);
1079
break;
1080
}
1081
1082
/*
1083
* The XID is in the first uint32_t of the reply.
1084
*/
1085
if (m->m_len < sizeof(xid) && m_length(m, NULL) < sizeof(xid)) {
1086
/*
1087
* Should never happen.
1088
*/
1089
m_freem(m);
1090
continue;
1091
}
1092
1093
m_copydata(m, 0, sizeof(xid), (char *)&xid);
1094
xid = ntohl(xid);
1095
1096
/*
1097
* Attempt to match this reply with a pending request.
1098
*/
1099
mtx_lock(&cs->cs_lock);
1100
foundreq = 0;
1101
TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
1102
if (cr->cr_xid == xid) {
1103
/*
1104
* This one matches. We leave the
1105
* reply mbuf in cr->cr_mrep. Set the
1106
* XID to zero so that we will ignore
1107
* any duplicated replies that arrive
1108
* before clnt_dg_call removes it from
1109
* the queue.
1110
*/
1111
cr->cr_xid = 0;
1112
cr->cr_mrep = m;
1113
cr->cr_error = 0;
1114
foundreq = 1;
1115
wakeup(cr);
1116
break;
1117
}
1118
}
1119
mtx_unlock(&cs->cs_lock);
1120
1121
/*
1122
* If we didn't find the matching request, just drop
1123
* it - its probably a repeated reply.
1124
*/
1125
if (!foundreq)
1126
m_freem(m);
1127
} while (m);
1128
cs->cs_upcallrefs--;
1129
if (cs->cs_upcallrefs < 0)
1130
panic("rpcdg upcall refcnt");
1131
if (cs->cs_upcallrefs == 0)
1132
wakeup(&cs->cs_upcallrefs);
1133
return (SU_OK);
1134
}
1135
1136
/*
1137
* Wait for all upcalls in progress to complete.
1138
*/
1139
static void
1140
clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs)
1141
{
1142
1143
SOCK_RECVBUF_LOCK_ASSERT(so);
1144
1145
while (cs->cs_upcallrefs > 0)
1146
(void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0,
1147
"rpcdgup", 0);
1148
}
1149
1150