Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/sys/rpc/svc.c
39477 views
1
/* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */
2
3
/*-
4
* SPDX-License-Identifier: BSD-3-Clause
5
*
6
* Copyright (c) 2009, Sun Microsystems, Inc.
7
* All rights reserved.
8
*
9
* Redistribution and use in source and binary forms, with or without
10
* modification, are permitted provided that the following conditions are met:
11
* - Redistributions of source code must retain the above copyright notice,
12
* this list of conditions and the following disclaimer.
13
* - Redistributions in binary form must reproduce the above copyright notice,
14
* this list of conditions and the following disclaimer in the documentation
15
* and/or other materials provided with the distribution.
16
* - Neither the name of Sun Microsystems, Inc. nor the names of its
17
* contributors may be used to endorse or promote products derived
18
* from this software without specific prior written permission.
19
*
20
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
* POSSIBILITY OF SUCH DAMAGE.
31
*/
32
33
#include <sys/cdefs.h>
34
/*
35
* svc.c, Server-side remote procedure call interface.
36
*
37
* There are two sets of procedures here. The xprt routines are
38
* for handling transport handles. The svc routines handle the
39
* list of service routines.
40
*
41
* Copyright (C) 1984, Sun Microsystems, Inc.
42
*/
43
44
#include <sys/param.h>
45
#include <sys/jail.h>
46
#include <sys/lock.h>
47
#include <sys/kernel.h>
48
#include <sys/kthread.h>
49
#include <sys/malloc.h>
50
#include <sys/mbuf.h>
51
#include <sys/mutex.h>
52
#include <sys/proc.h>
53
#include <sys/protosw.h>
54
#include <sys/queue.h>
55
#include <sys/socketvar.h>
56
#include <sys/systm.h>
57
#include <sys/smp.h>
58
#include <sys/sx.h>
59
#include <sys/ucred.h>
60
61
#include <netinet/tcp.h>
62
63
#include <rpc/rpc.h>
64
#include <rpc/rpcb_clnt.h>
65
#include <rpc/replay.h>
66
67
#include <rpc/rpc_com.h>
68
69
#define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
70
#define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
71
72
static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73
char *);
74
static void svc_new_thread(SVCGROUP *grp);
75
static void xprt_unregister_locked(SVCXPRT *xprt);
76
static void svc_change_space_used(SVCPOOL *pool, long delta);
77
static bool_t svc_request_space_available(SVCPOOL *pool);
78
static void svcpool_cleanup(SVCPOOL *pool);
79
80
/* *************** SVCXPRT related stuff **************** */
81
82
static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
83
static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
84
static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
85
86
SVCPOOL*
87
svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
88
{
89
SVCPOOL *pool;
90
SVCGROUP *grp;
91
int g;
92
93
pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
94
95
mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
96
pool->sp_name = name;
97
pool->sp_state = SVCPOOL_INIT;
98
pool->sp_proc = NULL;
99
TAILQ_INIT(&pool->sp_callouts);
100
TAILQ_INIT(&pool->sp_lcallouts);
101
pool->sp_minthreads = 1;
102
pool->sp_maxthreads = 1;
103
pool->sp_groupcount = 1;
104
for (g = 0; g < SVC_MAXGROUPS; g++) {
105
grp = &pool->sp_groups[g];
106
mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
107
grp->sg_pool = pool;
108
grp->sg_state = SVCPOOL_ACTIVE;
109
TAILQ_INIT(&grp->sg_xlist);
110
TAILQ_INIT(&grp->sg_active);
111
LIST_INIT(&grp->sg_idlethreads);
112
grp->sg_minthreads = 1;
113
grp->sg_maxthreads = 1;
114
}
115
116
/*
117
* Don't use more than a quarter of mbuf clusters. Nota bene:
118
* nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
119
* on LP64 architectures, so cast to u_long to avoid undefined
120
* behavior. (ILP32 architectures cannot have nmbclusters
121
* large enough to overflow for other reasons.)
122
*/
123
pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
124
pool->sp_space_low = (pool->sp_space_high / 3) * 2;
125
126
sysctl_ctx_init(&pool->sp_sysctl);
127
if (IS_DEFAULT_VNET(curvnet) && sysctl_base) {
128
SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
129
"minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
130
pool, 0, svcpool_minthread_sysctl, "I",
131
"Minimal number of threads");
132
SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
133
"maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
134
pool, 0, svcpool_maxthread_sysctl, "I",
135
"Maximal number of threads");
136
SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137
"threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE,
138
pool, 0, svcpool_threads_sysctl, "I",
139
"Current number of threads");
140
SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
141
"groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
142
"Number of thread groups");
143
144
SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
145
"request_space_used", CTLFLAG_RD,
146
&pool->sp_space_used,
147
"Space in parsed but not handled requests.");
148
149
SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
150
"request_space_used_highest", CTLFLAG_RD,
151
&pool->sp_space_used_highest,
152
"Highest space used since reboot.");
153
154
SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
155
"request_space_high", CTLFLAG_RW,
156
&pool->sp_space_high,
157
"Maximum space in parsed but not handled requests.");
158
159
SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
160
"request_space_low", CTLFLAG_RW,
161
&pool->sp_space_low,
162
"Low water mark for request space.");
163
164
SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
165
"request_space_throttled", CTLFLAG_RD,
166
&pool->sp_space_throttled, 0,
167
"Whether nfs requests are currently throttled");
168
169
SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
170
"request_space_throttle_count", CTLFLAG_RD,
171
&pool->sp_space_throttle_count, 0,
172
"Count of times throttling based on request space has occurred");
173
}
174
175
return pool;
176
}
177
178
/*
179
* Code common to svcpool_destroy() and svcpool_close(), which cleans up
180
* the pool data structures.
181
*/
182
static void
183
svcpool_cleanup(SVCPOOL *pool)
184
{
185
SVCGROUP *grp;
186
SVCXPRT *xprt, *nxprt;
187
struct svc_callout *s;
188
struct svc_loss_callout *sl;
189
struct svcxprt_list cleanup;
190
int g;
191
192
TAILQ_INIT(&cleanup);
193
194
for (g = 0; g < SVC_MAXGROUPS; g++) {
195
grp = &pool->sp_groups[g];
196
mtx_lock(&grp->sg_lock);
197
while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
198
xprt_unregister_locked(xprt);
199
TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
200
}
201
mtx_unlock(&grp->sg_lock);
202
}
203
TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
204
if (xprt->xp_socket != NULL)
205
soshutdown(xprt->xp_socket, SHUT_WR);
206
SVC_RELEASE(xprt);
207
}
208
209
mtx_lock(&pool->sp_lock);
210
while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
211
mtx_unlock(&pool->sp_lock);
212
svc_unreg(pool, s->sc_prog, s->sc_vers);
213
mtx_lock(&pool->sp_lock);
214
}
215
while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
216
mtx_unlock(&pool->sp_lock);
217
svc_loss_unreg(pool, sl->slc_dispatch);
218
mtx_lock(&pool->sp_lock);
219
}
220
mtx_unlock(&pool->sp_lock);
221
}
222
223
void
224
svcpool_destroy(SVCPOOL *pool)
225
{
226
SVCGROUP *grp;
227
int g;
228
229
svcpool_cleanup(pool);
230
231
for (g = 0; g < SVC_MAXGROUPS; g++) {
232
grp = &pool->sp_groups[g];
233
mtx_destroy(&grp->sg_lock);
234
}
235
mtx_destroy(&pool->sp_lock);
236
237
if (pool->sp_rcache)
238
replay_freecache(pool->sp_rcache);
239
240
sysctl_ctx_free(&pool->sp_sysctl);
241
free(pool, M_RPC);
242
}
243
244
/*
245
* Similar to svcpool_destroy(), except that it does not destroy the actual
246
* data structures. As such, "pool" may be used again.
247
*/
248
void
249
svcpool_close(SVCPOOL *pool)
250
{
251
SVCGROUP *grp;
252
int g;
253
254
svcpool_cleanup(pool);
255
256
/* Now, initialize the pool's state for a fresh svc_run() call. */
257
mtx_lock(&pool->sp_lock);
258
pool->sp_state = SVCPOOL_INIT;
259
mtx_unlock(&pool->sp_lock);
260
for (g = 0; g < SVC_MAXGROUPS; g++) {
261
grp = &pool->sp_groups[g];
262
mtx_lock(&grp->sg_lock);
263
grp->sg_state = SVCPOOL_ACTIVE;
264
mtx_unlock(&grp->sg_lock);
265
}
266
}
267
268
/*
269
* Sysctl handler to get the present thread count on a pool
270
*/
271
static int
272
svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
273
{
274
SVCPOOL *pool;
275
int threads, error, g;
276
277
pool = oidp->oid_arg1;
278
threads = 0;
279
mtx_lock(&pool->sp_lock);
280
for (g = 0; g < pool->sp_groupcount; g++)
281
threads += pool->sp_groups[g].sg_threadcount;
282
mtx_unlock(&pool->sp_lock);
283
error = sysctl_handle_int(oidp, &threads, 0, req);
284
return (error);
285
}
286
287
/*
288
* Sysctl handler to set the minimum thread count on a pool
289
*/
290
static int
291
svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
292
{
293
SVCPOOL *pool;
294
int newminthreads, error, g;
295
296
pool = oidp->oid_arg1;
297
newminthreads = pool->sp_minthreads;
298
error = sysctl_handle_int(oidp, &newminthreads, 0, req);
299
if (error == 0 && newminthreads != pool->sp_minthreads) {
300
if (newminthreads > pool->sp_maxthreads)
301
return (EINVAL);
302
mtx_lock(&pool->sp_lock);
303
pool->sp_minthreads = newminthreads;
304
for (g = 0; g < pool->sp_groupcount; g++) {
305
pool->sp_groups[g].sg_minthreads = max(1,
306
pool->sp_minthreads / pool->sp_groupcount);
307
}
308
mtx_unlock(&pool->sp_lock);
309
}
310
return (error);
311
}
312
313
/*
314
* Sysctl handler to set the maximum thread count on a pool
315
*/
316
static int
317
svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
318
{
319
SVCPOOL *pool;
320
int newmaxthreads, error, g;
321
322
pool = oidp->oid_arg1;
323
newmaxthreads = pool->sp_maxthreads;
324
error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
325
if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
326
if (newmaxthreads < pool->sp_minthreads)
327
return (EINVAL);
328
mtx_lock(&pool->sp_lock);
329
pool->sp_maxthreads = newmaxthreads;
330
for (g = 0; g < pool->sp_groupcount; g++) {
331
pool->sp_groups[g].sg_maxthreads = max(1,
332
pool->sp_maxthreads / pool->sp_groupcount);
333
}
334
mtx_unlock(&pool->sp_lock);
335
}
336
return (error);
337
}
338
339
/*
340
* Activate a transport handle.
341
*/
342
void
343
xprt_register(SVCXPRT *xprt)
344
{
345
SVCPOOL *pool = xprt->xp_pool;
346
SVCGROUP *grp;
347
int g;
348
349
SVC_ACQUIRE(xprt);
350
g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
351
xprt->xp_group = grp = &pool->sp_groups[g];
352
mtx_lock(&grp->sg_lock);
353
xprt->xp_registered = TRUE;
354
xprt->xp_active = FALSE;
355
TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
356
mtx_unlock(&grp->sg_lock);
357
}
358
359
/*
360
* De-activate a transport handle. Note: the locked version doesn't
361
* release the transport - caller must do that after dropping the pool
362
* lock.
363
*/
364
static void
365
xprt_unregister_locked(SVCXPRT *xprt)
366
{
367
SVCGROUP *grp = xprt->xp_group;
368
369
mtx_assert(&grp->sg_lock, MA_OWNED);
370
KASSERT(xprt->xp_registered == TRUE,
371
("xprt_unregister_locked: not registered"));
372
xprt_inactive_locked(xprt);
373
TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
374
xprt->xp_registered = FALSE;
375
}
376
377
void
378
xprt_unregister(SVCXPRT *xprt)
379
{
380
SVCGROUP *grp = xprt->xp_group;
381
382
mtx_lock(&grp->sg_lock);
383
if (xprt->xp_registered == FALSE) {
384
/* Already unregistered by another thread */
385
mtx_unlock(&grp->sg_lock);
386
return;
387
}
388
xprt_unregister_locked(xprt);
389
mtx_unlock(&grp->sg_lock);
390
391
if (xprt->xp_socket != NULL)
392
soshutdown(xprt->xp_socket, SHUT_WR);
393
SVC_RELEASE(xprt);
394
}
395
396
/*
397
* Attempt to assign a service thread to this transport.
398
*/
399
static int
400
xprt_assignthread(SVCXPRT *xprt)
401
{
402
SVCGROUP *grp = xprt->xp_group;
403
SVCTHREAD *st;
404
405
mtx_assert(&grp->sg_lock, MA_OWNED);
406
st = LIST_FIRST(&grp->sg_idlethreads);
407
if (st) {
408
LIST_REMOVE(st, st_ilink);
409
SVC_ACQUIRE(xprt);
410
xprt->xp_thread = st;
411
st->st_xprt = xprt;
412
cv_signal(&st->st_cond);
413
return (TRUE);
414
} else {
415
/*
416
* See if we can create a new thread. The
417
* actual thread creation happens in
418
* svc_run_internal because our locking state
419
* is poorly defined (we are typically called
420
* from a socket upcall). Don't create more
421
* than one thread per second.
422
*/
423
if (grp->sg_state == SVCPOOL_ACTIVE
424
&& grp->sg_lastcreatetime < time_uptime
425
&& grp->sg_threadcount < grp->sg_maxthreads) {
426
grp->sg_state = SVCPOOL_THREADWANTED;
427
}
428
}
429
return (FALSE);
430
}
431
432
void
433
xprt_active(SVCXPRT *xprt)
434
{
435
SVCGROUP *grp = xprt->xp_group;
436
437
mtx_lock(&grp->sg_lock);
438
439
if (!xprt->xp_registered) {
440
/*
441
* Race with xprt_unregister - we lose.
442
*/
443
mtx_unlock(&grp->sg_lock);
444
return;
445
}
446
447
if (!xprt->xp_active) {
448
xprt->xp_active = TRUE;
449
if (xprt->xp_thread == NULL) {
450
if (!svc_request_space_available(xprt->xp_pool) ||
451
!xprt_assignthread(xprt))
452
TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
453
xp_alink);
454
}
455
}
456
457
mtx_unlock(&grp->sg_lock);
458
}
459
460
void
461
xprt_inactive_locked(SVCXPRT *xprt)
462
{
463
SVCGROUP *grp = xprt->xp_group;
464
465
mtx_assert(&grp->sg_lock, MA_OWNED);
466
if (xprt->xp_active) {
467
if (xprt->xp_thread == NULL)
468
TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
469
xprt->xp_active = FALSE;
470
}
471
}
472
473
void
474
xprt_inactive(SVCXPRT *xprt)
475
{
476
SVCGROUP *grp = xprt->xp_group;
477
478
mtx_lock(&grp->sg_lock);
479
xprt_inactive_locked(xprt);
480
mtx_unlock(&grp->sg_lock);
481
}
482
483
/*
484
* Variant of xprt_inactive() for use only when sure that port is
485
* assigned to thread. For example, within receive handlers.
486
*/
487
void
488
xprt_inactive_self(SVCXPRT *xprt)
489
{
490
491
KASSERT(xprt->xp_thread != NULL,
492
("xprt_inactive_self(%p) with NULL xp_thread", xprt));
493
xprt->xp_active = FALSE;
494
}
495
496
/*
497
* Add a service program to the callout list.
498
* The dispatch routine will be called when a rpc request for this
499
* program number comes in.
500
*/
501
bool_t
502
svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
503
void (*dispatch)(struct svc_req *, SVCXPRT *),
504
const struct netconfig *nconf)
505
{
506
SVCPOOL *pool = xprt->xp_pool;
507
struct svc_callout *s;
508
char *netid = NULL;
509
int flag = 0;
510
511
/* VARIABLES PROTECTED BY svc_lock: s, svc_head */
512
513
if (xprt->xp_netid) {
514
netid = strdup(xprt->xp_netid, M_RPC);
515
flag = 1;
516
} else if (nconf && nconf->nc_netid) {
517
netid = strdup(nconf->nc_netid, M_RPC);
518
flag = 1;
519
} /* must have been created with svc_raw_create */
520
if ((netid == NULL) && (flag == 1)) {
521
return (FALSE);
522
}
523
524
mtx_lock(&pool->sp_lock);
525
if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
526
if (netid)
527
free(netid, M_RPC);
528
if (s->sc_dispatch == dispatch)
529
goto rpcb_it; /* he is registering another xptr */
530
mtx_unlock(&pool->sp_lock);
531
return (FALSE);
532
}
533
s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
534
if (s == NULL) {
535
if (netid)
536
free(netid, M_RPC);
537
mtx_unlock(&pool->sp_lock);
538
return (FALSE);
539
}
540
541
s->sc_prog = prog;
542
s->sc_vers = vers;
543
s->sc_dispatch = dispatch;
544
s->sc_netid = netid;
545
TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
546
547
if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
548
((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
549
550
rpcb_it:
551
mtx_unlock(&pool->sp_lock);
552
/* now register the information with the local binder service */
553
if (nconf) {
554
bool_t dummy;
555
struct netconfig tnc;
556
struct netbuf nb;
557
tnc = *nconf;
558
nb.buf = &xprt->xp_ltaddr;
559
nb.len = xprt->xp_ltaddr.ss_len;
560
dummy = rpcb_set(prog, vers, &tnc, &nb);
561
return (dummy);
562
}
563
return (TRUE);
564
}
565
566
/*
567
* Remove a service program from the callout list.
568
*/
569
void
570
svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
571
{
572
struct svc_callout *s;
573
574
/* unregister the information anyway */
575
(void) rpcb_unset(prog, vers, NULL);
576
mtx_lock(&pool->sp_lock);
577
while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
578
TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
579
if (s->sc_netid)
580
mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
581
mem_free(s, sizeof (struct svc_callout));
582
}
583
mtx_unlock(&pool->sp_lock);
584
}
585
586
/*
587
* Add a service connection loss program to the callout list.
588
* The dispatch routine will be called when some port in ths pool die.
589
*/
590
bool_t
591
svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
592
{
593
SVCPOOL *pool = xprt->xp_pool;
594
struct svc_loss_callout *s;
595
596
mtx_lock(&pool->sp_lock);
597
TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
598
if (s->slc_dispatch == dispatch)
599
break;
600
}
601
if (s != NULL) {
602
mtx_unlock(&pool->sp_lock);
603
return (TRUE);
604
}
605
s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
606
if (s == NULL) {
607
mtx_unlock(&pool->sp_lock);
608
return (FALSE);
609
}
610
s->slc_dispatch = dispatch;
611
TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
612
mtx_unlock(&pool->sp_lock);
613
return (TRUE);
614
}
615
616
/*
617
* Remove a service connection loss program from the callout list.
618
*/
619
void
620
svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
621
{
622
struct svc_loss_callout *s;
623
624
mtx_lock(&pool->sp_lock);
625
TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
626
if (s->slc_dispatch == dispatch) {
627
TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
628
free(s, M_RPC);
629
break;
630
}
631
}
632
mtx_unlock(&pool->sp_lock);
633
}
634
635
/* ********************** CALLOUT list related stuff ************* */
636
637
/*
638
* Search the callout list for a program number, return the callout
639
* struct.
640
*/
641
static struct svc_callout *
642
svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
643
{
644
struct svc_callout *s;
645
646
mtx_assert(&pool->sp_lock, MA_OWNED);
647
TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
648
if (s->sc_prog == prog && s->sc_vers == vers
649
&& (netid == NULL || s->sc_netid == NULL ||
650
strcmp(netid, s->sc_netid) == 0))
651
break;
652
}
653
654
return (s);
655
}
656
657
/* ******************* REPLY GENERATION ROUTINES ************ */
658
659
static bool_t
660
svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
661
struct mbuf *body)
662
{
663
SVCXPRT *xprt = rqstp->rq_xprt;
664
bool_t ok;
665
666
if (rqstp->rq_args) {
667
m_freem(rqstp->rq_args);
668
rqstp->rq_args = NULL;
669
}
670
671
if (xprt->xp_pool->sp_rcache)
672
replay_setreply(xprt->xp_pool->sp_rcache,
673
rply, svc_getrpccaller(rqstp), body);
674
675
if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
676
return (FALSE);
677
678
ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
679
if (rqstp->rq_addr) {
680
free(rqstp->rq_addr, M_SONAME);
681
rqstp->rq_addr = NULL;
682
}
683
684
return (ok);
685
}
686
687
/*
688
* Send a reply to an rpc request
689
*/
690
bool_t
691
svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
692
{
693
struct rpc_msg rply;
694
struct mbuf *m;
695
XDR xdrs;
696
bool_t ok;
697
698
rply.rm_xid = rqstp->rq_xid;
699
rply.rm_direction = REPLY;
700
rply.rm_reply.rp_stat = MSG_ACCEPTED;
701
rply.acpted_rply.ar_verf = rqstp->rq_verf;
702
rply.acpted_rply.ar_stat = SUCCESS;
703
rply.acpted_rply.ar_results.where = NULL;
704
rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
705
706
m = m_getcl(M_WAITOK, MT_DATA, 0);
707
xdrmbuf_create(&xdrs, m, XDR_ENCODE);
708
ok = xdr_results(&xdrs, xdr_location);
709
XDR_DESTROY(&xdrs);
710
711
if (ok) {
712
return (svc_sendreply_common(rqstp, &rply, m));
713
} else {
714
m_freem(m);
715
return (FALSE);
716
}
717
}
718
719
bool_t
720
svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
721
{
722
struct rpc_msg rply;
723
724
rply.rm_xid = rqstp->rq_xid;
725
rply.rm_direction = REPLY;
726
rply.rm_reply.rp_stat = MSG_ACCEPTED;
727
rply.acpted_rply.ar_verf = rqstp->rq_verf;
728
rply.acpted_rply.ar_stat = SUCCESS;
729
rply.acpted_rply.ar_results.where = NULL;
730
rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
731
732
return (svc_sendreply_common(rqstp, &rply, m));
733
}
734
735
/*
736
* No procedure error reply
737
*/
738
void
739
svcerr_noproc(struct svc_req *rqstp)
740
{
741
SVCXPRT *xprt = rqstp->rq_xprt;
742
struct rpc_msg rply;
743
744
rply.rm_xid = rqstp->rq_xid;
745
rply.rm_direction = REPLY;
746
rply.rm_reply.rp_stat = MSG_ACCEPTED;
747
rply.acpted_rply.ar_verf = rqstp->rq_verf;
748
rply.acpted_rply.ar_stat = PROC_UNAVAIL;
749
750
if (xprt->xp_pool->sp_rcache)
751
replay_setreply(xprt->xp_pool->sp_rcache,
752
&rply, svc_getrpccaller(rqstp), NULL);
753
754
svc_sendreply_common(rqstp, &rply, NULL);
755
}
756
757
/*
758
* Can't decode args error reply
759
*/
760
void
761
svcerr_decode(struct svc_req *rqstp)
762
{
763
SVCXPRT *xprt = rqstp->rq_xprt;
764
struct rpc_msg rply;
765
766
rply.rm_xid = rqstp->rq_xid;
767
rply.rm_direction = REPLY;
768
rply.rm_reply.rp_stat = MSG_ACCEPTED;
769
rply.acpted_rply.ar_verf = rqstp->rq_verf;
770
rply.acpted_rply.ar_stat = GARBAGE_ARGS;
771
772
if (xprt->xp_pool->sp_rcache)
773
replay_setreply(xprt->xp_pool->sp_rcache,
774
&rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
775
776
svc_sendreply_common(rqstp, &rply, NULL);
777
}
778
779
/*
780
* Some system error
781
*/
782
void
783
svcerr_systemerr(struct svc_req *rqstp)
784
{
785
SVCXPRT *xprt = rqstp->rq_xprt;
786
struct rpc_msg rply;
787
788
rply.rm_xid = rqstp->rq_xid;
789
rply.rm_direction = REPLY;
790
rply.rm_reply.rp_stat = MSG_ACCEPTED;
791
rply.acpted_rply.ar_verf = rqstp->rq_verf;
792
rply.acpted_rply.ar_stat = SYSTEM_ERR;
793
794
if (xprt->xp_pool->sp_rcache)
795
replay_setreply(xprt->xp_pool->sp_rcache,
796
&rply, svc_getrpccaller(rqstp), NULL);
797
798
svc_sendreply_common(rqstp, &rply, NULL);
799
}
800
801
/*
802
* Authentication error reply
803
*/
804
void
805
svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
806
{
807
SVCXPRT *xprt = rqstp->rq_xprt;
808
struct rpc_msg rply;
809
810
rply.rm_xid = rqstp->rq_xid;
811
rply.rm_direction = REPLY;
812
rply.rm_reply.rp_stat = MSG_DENIED;
813
rply.rjcted_rply.rj_stat = AUTH_ERROR;
814
rply.rjcted_rply.rj_why = why;
815
816
if (xprt->xp_pool->sp_rcache)
817
replay_setreply(xprt->xp_pool->sp_rcache,
818
&rply, svc_getrpccaller(rqstp), NULL);
819
820
svc_sendreply_common(rqstp, &rply, NULL);
821
}
822
823
/*
824
* Auth too weak error reply
825
*/
826
void
827
svcerr_weakauth(struct svc_req *rqstp)
828
{
829
830
svcerr_auth(rqstp, AUTH_TOOWEAK);
831
}
832
833
/*
834
* Program unavailable error reply
835
*/
836
void
837
svcerr_noprog(struct svc_req *rqstp)
838
{
839
SVCXPRT *xprt = rqstp->rq_xprt;
840
struct rpc_msg rply;
841
842
rply.rm_xid = rqstp->rq_xid;
843
rply.rm_direction = REPLY;
844
rply.rm_reply.rp_stat = MSG_ACCEPTED;
845
rply.acpted_rply.ar_verf = rqstp->rq_verf;
846
rply.acpted_rply.ar_stat = PROG_UNAVAIL;
847
848
if (xprt->xp_pool->sp_rcache)
849
replay_setreply(xprt->xp_pool->sp_rcache,
850
&rply, svc_getrpccaller(rqstp), NULL);
851
852
svc_sendreply_common(rqstp, &rply, NULL);
853
}
854
855
/*
856
* Program version mismatch error reply
857
*/
858
void
859
svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
860
{
861
SVCXPRT *xprt = rqstp->rq_xprt;
862
struct rpc_msg rply;
863
864
rply.rm_xid = rqstp->rq_xid;
865
rply.rm_direction = REPLY;
866
rply.rm_reply.rp_stat = MSG_ACCEPTED;
867
rply.acpted_rply.ar_verf = rqstp->rq_verf;
868
rply.acpted_rply.ar_stat = PROG_MISMATCH;
869
rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
870
rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
871
872
if (xprt->xp_pool->sp_rcache)
873
replay_setreply(xprt->xp_pool->sp_rcache,
874
&rply, svc_getrpccaller(rqstp), NULL);
875
876
svc_sendreply_common(rqstp, &rply, NULL);
877
}
878
879
/*
880
* Allocate a new server transport structure. All fields are
881
* initialized to zero and xp_p3 is initialized to point at an
882
* extension structure to hold various flags and authentication
883
* parameters.
884
*/
885
SVCXPRT *
886
svc_xprt_alloc(void)
887
{
888
SVCXPRT *xprt;
889
SVCXPRT_EXT *ext;
890
891
xprt = mem_alloc(sizeof(SVCXPRT));
892
ext = mem_alloc(sizeof(SVCXPRT_EXT));
893
xprt->xp_p3 = ext;
894
refcount_init(&xprt->xp_refs, 1);
895
896
return (xprt);
897
}
898
899
/*
900
* Free a server transport structure.
901
*/
902
void
903
svc_xprt_free(SVCXPRT *xprt)
904
{
905
906
mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
907
/* The size argument is ignored, so 0 is ok. */
908
mem_free(xprt->xp_gidp, 0);
909
mem_free(xprt, sizeof(SVCXPRT));
910
}
911
912
/* ******************* SERVER INPUT STUFF ******************* */
913
914
/*
915
* Read RPC requests from a transport and queue them to be
916
* executed. We handle authentication and replay cache replies here.
917
* Actually dispatching the RPC is deferred till svc_executereq.
918
*/
919
static enum xprt_stat
920
svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
921
{
922
SVCPOOL *pool = xprt->xp_pool;
923
struct svc_req *r;
924
struct rpc_msg msg;
925
struct mbuf *args;
926
struct svc_loss_callout *s;
927
enum xprt_stat stat;
928
929
/* now receive msgs from xprtprt (support batch calls) */
930
r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
931
932
msg.rm_call.cb_cred.oa_base = r->rq_credarea;
933
msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
934
r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
935
if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
936
enum auth_stat why;
937
938
/*
939
* Handle replays and authenticate before queuing the
940
* request to be executed.
941
*/
942
SVC_ACQUIRE(xprt);
943
r->rq_xprt = xprt;
944
if (pool->sp_rcache) {
945
struct rpc_msg repmsg;
946
struct mbuf *repbody;
947
enum replay_state rs;
948
rs = replay_find(pool->sp_rcache, &msg,
949
svc_getrpccaller(r), &repmsg, &repbody);
950
switch (rs) {
951
case RS_NEW:
952
break;
953
case RS_DONE:
954
SVC_REPLY(xprt, &repmsg, r->rq_addr,
955
repbody, &r->rq_reply_seq);
956
if (r->rq_addr) {
957
free(r->rq_addr, M_SONAME);
958
r->rq_addr = NULL;
959
}
960
m_freem(args);
961
goto call_done;
962
963
default:
964
m_freem(args);
965
goto call_done;
966
}
967
}
968
969
r->rq_xid = msg.rm_xid;
970
r->rq_prog = msg.rm_call.cb_prog;
971
r->rq_vers = msg.rm_call.cb_vers;
972
r->rq_proc = msg.rm_call.cb_proc;
973
r->rq_size = sizeof(*r) + m_length(args, NULL);
974
r->rq_args = args;
975
if ((why = _authenticate(r, &msg)) != AUTH_OK) {
976
/*
977
* RPCSEC_GSS uses this return code
978
* for requests that form part of its
979
* context establishment protocol and
980
* should not be dispatched to the
981
* application.
982
*/
983
if (why != RPCSEC_GSS_NODISPATCH)
984
svcerr_auth(r, why);
985
goto call_done;
986
}
987
988
if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
989
svcerr_decode(r);
990
goto call_done;
991
}
992
993
/*
994
* Defer enabling DDP until the first non-NULLPROC RPC
995
* is received to allow STARTTLS authentication to
996
* enable TLS offload first.
997
*/
998
if (xprt->xp_doneddp == 0 && r->rq_proc != NULLPROC &&
999
xprt->xp_socket != NULL &&
1000
atomic_cmpset_int(&xprt->xp_doneddp, 0, 1)) {
1001
if (xprt->xp_socket->so_proto->pr_protocol ==
1002
IPPROTO_TCP) {
1003
int optval = 1;
1004
1005
(void)so_setsockopt(xprt->xp_socket,
1006
IPPROTO_TCP, TCP_USE_DDP, &optval,
1007
sizeof(optval));
1008
}
1009
}
1010
1011
/*
1012
* Everything checks out, return request to caller.
1013
*/
1014
*rqstp_ret = r;
1015
r = NULL;
1016
}
1017
call_done:
1018
if (r) {
1019
svc_freereq(r);
1020
r = NULL;
1021
}
1022
if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1023
TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1024
(*s->slc_dispatch)(xprt);
1025
xprt_unregister(xprt);
1026
}
1027
1028
return (stat);
1029
}
1030
1031
static void
1032
svc_executereq(struct svc_req *rqstp)
1033
{
1034
SVCXPRT *xprt = rqstp->rq_xprt;
1035
SVCPOOL *pool = xprt->xp_pool;
1036
int prog_found;
1037
rpcvers_t low_vers;
1038
rpcvers_t high_vers;
1039
struct svc_callout *s;
1040
1041
/* now match message with a registered service*/
1042
prog_found = FALSE;
1043
low_vers = (rpcvers_t) -1L;
1044
high_vers = (rpcvers_t) 0L;
1045
TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1046
if (s->sc_prog == rqstp->rq_prog) {
1047
if (s->sc_vers == rqstp->rq_vers) {
1048
/*
1049
* We hand ownership of r to the
1050
* dispatch method - they must call
1051
* svc_freereq.
1052
*/
1053
(*s->sc_dispatch)(rqstp, xprt);
1054
return;
1055
} /* found correct version */
1056
prog_found = TRUE;
1057
if (s->sc_vers < low_vers)
1058
low_vers = s->sc_vers;
1059
if (s->sc_vers > high_vers)
1060
high_vers = s->sc_vers;
1061
} /* found correct program */
1062
}
1063
1064
/*
1065
* if we got here, the program or version
1066
* is not served ...
1067
*/
1068
if (prog_found)
1069
svcerr_progvers(rqstp, low_vers, high_vers);
1070
else
1071
svcerr_noprog(rqstp);
1072
1073
svc_freereq(rqstp);
1074
}
1075
1076
static void
1077
svc_checkidle(SVCGROUP *grp)
1078
{
1079
SVCXPRT *xprt, *nxprt;
1080
time_t timo;
1081
struct svcxprt_list cleanup;
1082
1083
TAILQ_INIT(&cleanup);
1084
TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1085
/*
1086
* Only some transports have idle timers. Don't time
1087
* something out which is just waking up.
1088
*/
1089
if (!xprt->xp_idletimeout || xprt->xp_thread)
1090
continue;
1091
1092
timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1093
if (time_uptime > timo) {
1094
xprt_unregister_locked(xprt);
1095
TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1096
}
1097
}
1098
1099
mtx_unlock(&grp->sg_lock);
1100
TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1101
soshutdown(xprt->xp_socket, SHUT_WR);
1102
SVC_RELEASE(xprt);
1103
}
1104
mtx_lock(&grp->sg_lock);
1105
}
1106
1107
static void
1108
svc_assign_waiting_sockets(SVCPOOL *pool)
1109
{
1110
SVCGROUP *grp;
1111
SVCXPRT *xprt;
1112
int g;
1113
1114
for (g = 0; g < pool->sp_groupcount; g++) {
1115
grp = &pool->sp_groups[g];
1116
mtx_lock(&grp->sg_lock);
1117
while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1118
if (xprt_assignthread(xprt))
1119
TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1120
else
1121
break;
1122
}
1123
mtx_unlock(&grp->sg_lock);
1124
}
1125
}
1126
1127
static void
1128
svc_change_space_used(SVCPOOL *pool, long delta)
1129
{
1130
unsigned long value;
1131
1132
value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1133
if (delta > 0) {
1134
if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1135
pool->sp_space_throttled = TRUE;
1136
pool->sp_space_throttle_count++;
1137
}
1138
if (value > pool->sp_space_used_highest)
1139
pool->sp_space_used_highest = value;
1140
} else {
1141
if (value < pool->sp_space_low && pool->sp_space_throttled) {
1142
pool->sp_space_throttled = FALSE;
1143
svc_assign_waiting_sockets(pool);
1144
}
1145
}
1146
}
1147
1148
static bool_t
1149
svc_request_space_available(SVCPOOL *pool)
1150
{
1151
1152
if (pool->sp_space_throttled)
1153
return (FALSE);
1154
return (TRUE);
1155
}
1156
1157
static void
1158
svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1159
{
1160
SVCPOOL *pool = grp->sg_pool;
1161
SVCTHREAD *st, *stpref;
1162
SVCXPRT *xprt;
1163
enum xprt_stat stat;
1164
struct svc_req *rqstp;
1165
struct proc *p;
1166
long sz;
1167
int error;
1168
1169
st = mem_alloc(sizeof(*st));
1170
mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1171
st->st_pool = pool;
1172
st->st_xprt = NULL;
1173
STAILQ_INIT(&st->st_reqs);
1174
cv_init(&st->st_cond, "rpcsvc");
1175
1176
mtx_lock(&grp->sg_lock);
1177
1178
/*
1179
* If we are a new thread which was spawned to cope with
1180
* increased load, set the state back to SVCPOOL_ACTIVE.
1181
*/
1182
if (grp->sg_state == SVCPOOL_THREADSTARTING)
1183
grp->sg_state = SVCPOOL_ACTIVE;
1184
1185
while (grp->sg_state != SVCPOOL_CLOSING) {
1186
/*
1187
* Create new thread if requested.
1188
*/
1189
if (grp->sg_state == SVCPOOL_THREADWANTED) {
1190
grp->sg_state = SVCPOOL_THREADSTARTING;
1191
grp->sg_lastcreatetime = time_uptime;
1192
mtx_unlock(&grp->sg_lock);
1193
svc_new_thread(grp);
1194
mtx_lock(&grp->sg_lock);
1195
continue;
1196
}
1197
1198
/*
1199
* Check for idle transports once per second.
1200
*/
1201
if (time_uptime > grp->sg_lastidlecheck) {
1202
grp->sg_lastidlecheck = time_uptime;
1203
svc_checkidle(grp);
1204
}
1205
1206
xprt = st->st_xprt;
1207
if (!xprt) {
1208
/*
1209
* Enforce maxthreads count.
1210
*/
1211
if (!ismaster && grp->sg_threadcount >
1212
grp->sg_maxthreads)
1213
break;
1214
1215
/*
1216
* Before sleeping, see if we can find an
1217
* active transport which isn't being serviced
1218
* by a thread.
1219
*/
1220
if (svc_request_space_available(pool) &&
1221
(xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1222
TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1223
SVC_ACQUIRE(xprt);
1224
xprt->xp_thread = st;
1225
st->st_xprt = xprt;
1226
continue;
1227
}
1228
1229
LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1230
if (ismaster || (!ismaster &&
1231
grp->sg_threadcount > grp->sg_minthreads))
1232
error = cv_timedwait_sig(&st->st_cond,
1233
&grp->sg_lock, 5 * hz);
1234
else
1235
error = cv_wait_sig(&st->st_cond,
1236
&grp->sg_lock);
1237
if (st->st_xprt == NULL)
1238
LIST_REMOVE(st, st_ilink);
1239
1240
/*
1241
* Reduce worker thread count when idle.
1242
*/
1243
if (error == EWOULDBLOCK) {
1244
if (!ismaster
1245
&& (grp->sg_threadcount
1246
> grp->sg_minthreads)
1247
&& !st->st_xprt)
1248
break;
1249
} else if (error != 0) {
1250
KASSERT(error == EINTR || error == ERESTART,
1251
("non-signal error %d", error));
1252
mtx_unlock(&grp->sg_lock);
1253
p = curproc;
1254
PROC_LOCK(p);
1255
if (P_SHOULDSTOP(p) ||
1256
(p->p_flag & P_TOTAL_STOP) != 0) {
1257
thread_suspend_check(0);
1258
PROC_UNLOCK(p);
1259
mtx_lock(&grp->sg_lock);
1260
} else {
1261
PROC_UNLOCK(p);
1262
svc_exit(pool);
1263
mtx_lock(&grp->sg_lock);
1264
break;
1265
}
1266
}
1267
continue;
1268
}
1269
mtx_unlock(&grp->sg_lock);
1270
1271
/*
1272
* Drain the transport socket and queue up any RPCs.
1273
*/
1274
xprt->xp_lastactive = time_uptime;
1275
do {
1276
if (!svc_request_space_available(pool))
1277
break;
1278
rqstp = NULL;
1279
stat = svc_getreq(xprt, &rqstp);
1280
if (rqstp) {
1281
svc_change_space_used(pool, rqstp->rq_size);
1282
/*
1283
* See if the application has a preference
1284
* for some other thread.
1285
*/
1286
if (pool->sp_assign) {
1287
stpref = pool->sp_assign(st, rqstp);
1288
rqstp->rq_thread = stpref;
1289
STAILQ_INSERT_TAIL(&stpref->st_reqs,
1290
rqstp, rq_link);
1291
mtx_unlock(&stpref->st_lock);
1292
if (stpref != st)
1293
rqstp = NULL;
1294
} else {
1295
rqstp->rq_thread = st;
1296
STAILQ_INSERT_TAIL(&st->st_reqs,
1297
rqstp, rq_link);
1298
}
1299
}
1300
} while (rqstp == NULL && stat == XPRT_MOREREQS
1301
&& grp->sg_state != SVCPOOL_CLOSING);
1302
1303
/*
1304
* Move this transport to the end of the active list to
1305
* ensure fairness when multiple transports are active.
1306
* If this was the last queued request, svc_getreq will end
1307
* up calling xprt_inactive to remove from the active list.
1308
*/
1309
mtx_lock(&grp->sg_lock);
1310
xprt->xp_thread = NULL;
1311
st->st_xprt = NULL;
1312
if (xprt->xp_active) {
1313
if (!svc_request_space_available(pool) ||
1314
!xprt_assignthread(xprt))
1315
TAILQ_INSERT_TAIL(&grp->sg_active,
1316
xprt, xp_alink);
1317
}
1318
mtx_unlock(&grp->sg_lock);
1319
SVC_RELEASE(xprt);
1320
1321
/*
1322
* Execute what we have queued.
1323
*/
1324
mtx_lock(&st->st_lock);
1325
while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1326
STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1327
mtx_unlock(&st->st_lock);
1328
sz = (long)rqstp->rq_size;
1329
svc_executereq(rqstp);
1330
svc_change_space_used(pool, -sz);
1331
mtx_lock(&st->st_lock);
1332
}
1333
mtx_unlock(&st->st_lock);
1334
mtx_lock(&grp->sg_lock);
1335
}
1336
1337
if (st->st_xprt) {
1338
xprt = st->st_xprt;
1339
st->st_xprt = NULL;
1340
SVC_RELEASE(xprt);
1341
}
1342
KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1343
mtx_destroy(&st->st_lock);
1344
cv_destroy(&st->st_cond);
1345
mem_free(st, sizeof(*st));
1346
1347
grp->sg_threadcount--;
1348
if (!ismaster)
1349
wakeup(grp);
1350
mtx_unlock(&grp->sg_lock);
1351
}
1352
1353
static void
1354
svc_thread_start(void *arg)
1355
{
1356
1357
svc_run_internal((SVCGROUP *) arg, FALSE);
1358
kthread_exit();
1359
}
1360
1361
static void
1362
svc_new_thread(SVCGROUP *grp)
1363
{
1364
SVCPOOL *pool = grp->sg_pool;
1365
struct thread *td;
1366
1367
mtx_lock(&grp->sg_lock);
1368
grp->sg_threadcount++;
1369
mtx_unlock(&grp->sg_lock);
1370
kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1371
"%s: service", pool->sp_name);
1372
}
1373
1374
void
1375
svc_run(SVCPOOL *pool)
1376
{
1377
int g, i;
1378
struct proc *p;
1379
struct thread *td;
1380
SVCGROUP *grp;
1381
1382
p = curproc;
1383
td = curthread;
1384
snprintf(td->td_name, sizeof(td->td_name),
1385
"%s: master", pool->sp_name);
1386
pool->sp_state = SVCPOOL_ACTIVE;
1387
pool->sp_proc = p;
1388
1389
/* Choose group count based on number of threads and CPUs. */
1390
pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1391
min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1392
for (g = 0; g < pool->sp_groupcount; g++) {
1393
grp = &pool->sp_groups[g];
1394
grp->sg_minthreads = max(1,
1395
pool->sp_minthreads / pool->sp_groupcount);
1396
grp->sg_maxthreads = max(1,
1397
pool->sp_maxthreads / pool->sp_groupcount);
1398
grp->sg_lastcreatetime = time_uptime;
1399
}
1400
1401
/* Starting threads */
1402
pool->sp_groups[0].sg_threadcount++;
1403
for (g = 0; g < pool->sp_groupcount; g++) {
1404
grp = &pool->sp_groups[g];
1405
for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1406
svc_new_thread(grp);
1407
}
1408
svc_run_internal(&pool->sp_groups[0], TRUE);
1409
1410
/* Waiting for threads to stop. */
1411
for (g = 0; g < pool->sp_groupcount; g++) {
1412
grp = &pool->sp_groups[g];
1413
mtx_lock(&grp->sg_lock);
1414
while (grp->sg_threadcount > 0)
1415
msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1416
mtx_unlock(&grp->sg_lock);
1417
}
1418
}
1419
1420
void
1421
svc_exit(SVCPOOL *pool)
1422
{
1423
SVCGROUP *grp;
1424
SVCTHREAD *st;
1425
int g;
1426
1427
pool->sp_state = SVCPOOL_CLOSING;
1428
for (g = 0; g < pool->sp_groupcount; g++) {
1429
grp = &pool->sp_groups[g];
1430
mtx_lock(&grp->sg_lock);
1431
if (grp->sg_state != SVCPOOL_CLOSING) {
1432
grp->sg_state = SVCPOOL_CLOSING;
1433
LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1434
cv_signal(&st->st_cond);
1435
}
1436
mtx_unlock(&grp->sg_lock);
1437
}
1438
}
1439
1440
bool_t
1441
svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1442
{
1443
struct mbuf *m;
1444
XDR xdrs;
1445
bool_t stat;
1446
1447
m = rqstp->rq_args;
1448
rqstp->rq_args = NULL;
1449
1450
xdrmbuf_create(&xdrs, m, XDR_DECODE);
1451
stat = xargs(&xdrs, args);
1452
XDR_DESTROY(&xdrs);
1453
1454
return (stat);
1455
}
1456
1457
bool_t
1458
svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1459
{
1460
XDR xdrs;
1461
1462
if (rqstp->rq_addr) {
1463
free(rqstp->rq_addr, M_SONAME);
1464
rqstp->rq_addr = NULL;
1465
}
1466
1467
xdrs.x_op = XDR_FREE;
1468
return (xargs(&xdrs, args));
1469
}
1470
1471
void
1472
svc_freereq(struct svc_req *rqstp)
1473
{
1474
SVCTHREAD *st;
1475
SVCPOOL *pool;
1476
1477
st = rqstp->rq_thread;
1478
if (st) {
1479
pool = st->st_pool;
1480
if (pool->sp_done)
1481
pool->sp_done(st, rqstp);
1482
}
1483
1484
if (rqstp->rq_auth.svc_ah_ops)
1485
SVCAUTH_RELEASE(&rqstp->rq_auth);
1486
1487
if (rqstp->rq_xprt) {
1488
SVC_RELEASE(rqstp->rq_xprt);
1489
}
1490
1491
if (rqstp->rq_addr)
1492
free(rqstp->rq_addr, M_SONAME);
1493
1494
if (rqstp->rq_args)
1495
m_freem(rqstp->rq_args);
1496
1497
free(rqstp, M_RPC);
1498
}
1499
1500