Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
awilliam
GitHub Repository: awilliam/linux-vfio
Path: blob/master/net/sunrpc/sched.c
15109 views
1
/*
2
* linux/net/sunrpc/sched.c
3
*
4
* Scheduling for synchronous and asynchronous RPC requests.
5
*
6
* Copyright (C) 1996 Olaf Kirch, <[email protected]>
7
*
8
* TCP NFS related read + write fixes
9
* (C) 1999 Dave Airlie, University of Limerick, Ireland <[email protected]>
10
*/
11
12
#include <linux/module.h>
13
14
#include <linux/sched.h>
15
#include <linux/interrupt.h>
16
#include <linux/slab.h>
17
#include <linux/mempool.h>
18
#include <linux/smp.h>
19
#include <linux/spinlock.h>
20
#include <linux/mutex.h>
21
22
#include <linux/sunrpc/clnt.h>
23
24
#include "sunrpc.h"
25
26
#ifdef RPC_DEBUG
27
#define RPCDBG_FACILITY RPCDBG_SCHED
28
#endif
29
30
/*
31
* RPC slabs and memory pools
32
*/
33
#define RPC_BUFFER_MAXSIZE (2048)
34
#define RPC_BUFFER_POOLSIZE (8)
35
#define RPC_TASK_POOLSIZE (8)
36
static struct kmem_cache *rpc_task_slabp __read_mostly;
37
static struct kmem_cache *rpc_buffer_slabp __read_mostly;
38
static mempool_t *rpc_task_mempool __read_mostly;
39
static mempool_t *rpc_buffer_mempool __read_mostly;
40
41
static void rpc_async_schedule(struct work_struct *);
42
static void rpc_release_task(struct rpc_task *task);
43
static void __rpc_queue_timer_fn(unsigned long ptr);
44
45
/*
46
* RPC tasks sit here while waiting for conditions to improve.
47
*/
48
static struct rpc_wait_queue delay_queue;
49
50
/*
51
* rpciod-related stuff
52
*/
53
struct workqueue_struct *rpciod_workqueue;
54
55
/*
56
* Disable the timer for a given RPC task. Should be called with
57
* queue->lock and bh_disabled in order to avoid races within
58
* rpc_run_timer().
59
*/
60
static void
61
__rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
62
{
63
if (task->tk_timeout == 0)
64
return;
65
dprintk("RPC: %5u disabling timer\n", task->tk_pid);
66
task->tk_timeout = 0;
67
list_del(&task->u.tk_wait.timer_list);
68
if (list_empty(&queue->timer_list.list))
69
del_timer(&queue->timer_list.timer);
70
}
71
72
static void
73
rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
74
{
75
queue->timer_list.expires = expires;
76
mod_timer(&queue->timer_list.timer, expires);
77
}
78
79
/*
80
* Set up a timer for the current task.
81
*/
82
static void
83
__rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
84
{
85
if (!task->tk_timeout)
86
return;
87
88
dprintk("RPC: %5u setting alarm for %lu ms\n",
89
task->tk_pid, task->tk_timeout * 1000 / HZ);
90
91
task->u.tk_wait.expires = jiffies + task->tk_timeout;
92
if (list_empty(&queue->timer_list.list) || time_before(task->u.tk_wait.expires, queue->timer_list.expires))
93
rpc_set_queue_timer(queue, task->u.tk_wait.expires);
94
list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
95
}
96
97
/*
98
* Add new request to a priority queue.
99
*/
100
static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task)
101
{
102
struct list_head *q;
103
struct rpc_task *t;
104
105
INIT_LIST_HEAD(&task->u.tk_wait.links);
106
q = &queue->tasks[task->tk_priority];
107
if (unlikely(task->tk_priority > queue->maxpriority))
108
q = &queue->tasks[queue->maxpriority];
109
list_for_each_entry(t, q, u.tk_wait.list) {
110
if (t->tk_owner == task->tk_owner) {
111
list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
112
return;
113
}
114
}
115
list_add_tail(&task->u.tk_wait.list, q);
116
}
117
118
/*
119
* Add new request to wait queue.
120
*
121
* Swapper tasks always get inserted at the head of the queue.
122
* This should avoid many nasty memory deadlocks and hopefully
123
* improve overall performance.
124
* Everyone else gets appended to the queue to ensure proper FIFO behavior.
125
*/
126
static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
127
{
128
BUG_ON (RPC_IS_QUEUED(task));
129
130
if (RPC_IS_PRIORITY(queue))
131
__rpc_add_wait_queue_priority(queue, task);
132
else if (RPC_IS_SWAPPER(task))
133
list_add(&task->u.tk_wait.list, &queue->tasks[0]);
134
else
135
list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
136
task->tk_waitqueue = queue;
137
queue->qlen++;
138
rpc_set_queued(task);
139
140
dprintk("RPC: %5u added to queue %p \"%s\"\n",
141
task->tk_pid, queue, rpc_qname(queue));
142
}
143
144
/*
145
* Remove request from a priority queue.
146
*/
147
static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
148
{
149
struct rpc_task *t;
150
151
if (!list_empty(&task->u.tk_wait.links)) {
152
t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
153
list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
154
list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
155
}
156
}
157
158
/*
159
* Remove request from queue.
160
* Note: must be called with spin lock held.
161
*/
162
static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
163
{
164
__rpc_disable_timer(queue, task);
165
if (RPC_IS_PRIORITY(queue))
166
__rpc_remove_wait_queue_priority(task);
167
list_del(&task->u.tk_wait.list);
168
queue->qlen--;
169
dprintk("RPC: %5u removed from queue %p \"%s\"\n",
170
task->tk_pid, queue, rpc_qname(queue));
171
}
172
173
static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
174
{
175
queue->priority = priority;
176
queue->count = 1 << (priority * 2);
177
}
178
179
static inline void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid)
180
{
181
queue->owner = pid;
182
queue->nr = RPC_BATCH_COUNT;
183
}
184
185
static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
186
{
187
rpc_set_waitqueue_priority(queue, queue->maxpriority);
188
rpc_set_waitqueue_owner(queue, 0);
189
}
190
191
static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
192
{
193
int i;
194
195
spin_lock_init(&queue->lock);
196
for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
197
INIT_LIST_HEAD(&queue->tasks[i]);
198
queue->maxpriority = nr_queues - 1;
199
rpc_reset_waitqueue_priority(queue);
200
queue->qlen = 0;
201
setup_timer(&queue->timer_list.timer, __rpc_queue_timer_fn, (unsigned long)queue);
202
INIT_LIST_HEAD(&queue->timer_list.list);
203
#ifdef RPC_DEBUG
204
queue->name = qname;
205
#endif
206
}
207
208
void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
209
{
210
__rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
211
}
212
EXPORT_SYMBOL_GPL(rpc_init_priority_wait_queue);
213
214
void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
215
{
216
__rpc_init_priority_wait_queue(queue, qname, 1);
217
}
218
EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
219
220
void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
221
{
222
del_timer_sync(&queue->timer_list.timer);
223
}
224
EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
225
226
static int rpc_wait_bit_killable(void *word)
227
{
228
if (fatal_signal_pending(current))
229
return -ERESTARTSYS;
230
schedule();
231
return 0;
232
}
233
234
#ifdef RPC_DEBUG
235
static void rpc_task_set_debuginfo(struct rpc_task *task)
236
{
237
static atomic_t rpc_pid;
238
239
task->tk_pid = atomic_inc_return(&rpc_pid);
240
}
241
#else
242
static inline void rpc_task_set_debuginfo(struct rpc_task *task)
243
{
244
}
245
#endif
246
247
static void rpc_set_active(struct rpc_task *task)
248
{
249
rpc_task_set_debuginfo(task);
250
set_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
251
}
252
253
/*
254
* Mark an RPC call as having completed by clearing the 'active' bit
255
* and then waking up all tasks that were sleeping.
256
*/
257
static int rpc_complete_task(struct rpc_task *task)
258
{
259
void *m = &task->tk_runstate;
260
wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
261
struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
262
unsigned long flags;
263
int ret;
264
265
spin_lock_irqsave(&wq->lock, flags);
266
clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
267
ret = atomic_dec_and_test(&task->tk_count);
268
if (waitqueue_active(wq))
269
__wake_up_locked_key(wq, TASK_NORMAL, &k);
270
spin_unlock_irqrestore(&wq->lock, flags);
271
return ret;
272
}
273
274
/*
275
* Allow callers to wait for completion of an RPC call
276
*
277
* Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
278
* to enforce taking of the wq->lock and hence avoid races with
279
* rpc_complete_task().
280
*/
281
int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
282
{
283
if (action == NULL)
284
action = rpc_wait_bit_killable;
285
return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
286
action, TASK_KILLABLE);
287
}
288
EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
289
290
/*
291
* Make an RPC task runnable.
292
*
293
* Note: If the task is ASYNC, this must be called with
294
* the spinlock held to protect the wait queue operation.
295
*/
296
static void rpc_make_runnable(struct rpc_task *task)
297
{
298
rpc_clear_queued(task);
299
if (rpc_test_and_set_running(task))
300
return;
301
if (RPC_IS_ASYNC(task)) {
302
INIT_WORK(&task->u.tk_work, rpc_async_schedule);
303
queue_work(rpciod_workqueue, &task->u.tk_work);
304
} else
305
wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
306
}
307
308
/*
309
* Prepare for sleeping on a wait queue.
310
* By always appending tasks to the list we ensure FIFO behavior.
311
* NB: An RPC task will only receive interrupt-driven events as long
312
* as it's on a wait queue.
313
*/
314
static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
315
rpc_action action)
316
{
317
dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
318
task->tk_pid, rpc_qname(q), jiffies);
319
320
__rpc_add_wait_queue(q, task);
321
322
BUG_ON(task->tk_callback != NULL);
323
task->tk_callback = action;
324
__rpc_add_timer(q, task);
325
}
326
327
void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
328
rpc_action action)
329
{
330
/* We shouldn't ever put an inactive task to sleep */
331
BUG_ON(!RPC_IS_ACTIVATED(task));
332
333
/*
334
* Protect the queue operations.
335
*/
336
spin_lock_bh(&q->lock);
337
__rpc_sleep_on(q, task, action);
338
spin_unlock_bh(&q->lock);
339
}
340
EXPORT_SYMBOL_GPL(rpc_sleep_on);
341
342
/**
343
* __rpc_do_wake_up_task - wake up a single rpc_task
344
* @queue: wait queue
345
* @task: task to be woken up
346
*
347
* Caller must hold queue->lock, and have cleared the task queued flag.
348
*/
349
static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
350
{
351
dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
352
task->tk_pid, jiffies);
353
354
/* Has the task been executed yet? If not, we cannot wake it up! */
355
if (!RPC_IS_ACTIVATED(task)) {
356
printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
357
return;
358
}
359
360
__rpc_remove_wait_queue(queue, task);
361
362
rpc_make_runnable(task);
363
364
dprintk("RPC: __rpc_wake_up_task done\n");
365
}
366
367
/*
368
* Wake up a queued task while the queue lock is being held
369
*/
370
static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
371
{
372
if (RPC_IS_QUEUED(task) && task->tk_waitqueue == queue)
373
__rpc_do_wake_up_task(queue, task);
374
}
375
376
/*
377
* Tests whether rpc queue is empty
378
*/
379
int rpc_queue_empty(struct rpc_wait_queue *queue)
380
{
381
int res;
382
383
spin_lock_bh(&queue->lock);
384
res = queue->qlen;
385
spin_unlock_bh(&queue->lock);
386
return res == 0;
387
}
388
EXPORT_SYMBOL_GPL(rpc_queue_empty);
389
390
/*
391
* Wake up a task on a specific queue
392
*/
393
void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
394
{
395
spin_lock_bh(&queue->lock);
396
rpc_wake_up_task_queue_locked(queue, task);
397
spin_unlock_bh(&queue->lock);
398
}
399
EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
400
401
/*
402
* Wake up the next task on a priority queue.
403
*/
404
static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue)
405
{
406
struct list_head *q;
407
struct rpc_task *task;
408
409
/*
410
* Service a batch of tasks from a single owner.
411
*/
412
q = &queue->tasks[queue->priority];
413
if (!list_empty(q)) {
414
task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
415
if (queue->owner == task->tk_owner) {
416
if (--queue->nr)
417
goto out;
418
list_move_tail(&task->u.tk_wait.list, q);
419
}
420
/*
421
* Check if we need to switch queues.
422
*/
423
if (--queue->count)
424
goto new_owner;
425
}
426
427
/*
428
* Service the next queue.
429
*/
430
do {
431
if (q == &queue->tasks[0])
432
q = &queue->tasks[queue->maxpriority];
433
else
434
q = q - 1;
435
if (!list_empty(q)) {
436
task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
437
goto new_queue;
438
}
439
} while (q != &queue->tasks[queue->priority]);
440
441
rpc_reset_waitqueue_priority(queue);
442
return NULL;
443
444
new_queue:
445
rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
446
new_owner:
447
rpc_set_waitqueue_owner(queue, task->tk_owner);
448
out:
449
rpc_wake_up_task_queue_locked(queue, task);
450
return task;
451
}
452
453
/*
454
* Wake up the next task on the wait queue.
455
*/
456
struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
457
{
458
struct rpc_task *task = NULL;
459
460
dprintk("RPC: wake_up_next(%p \"%s\")\n",
461
queue, rpc_qname(queue));
462
spin_lock_bh(&queue->lock);
463
if (RPC_IS_PRIORITY(queue))
464
task = __rpc_wake_up_next_priority(queue);
465
else {
466
task_for_first(task, &queue->tasks[0])
467
rpc_wake_up_task_queue_locked(queue, task);
468
}
469
spin_unlock_bh(&queue->lock);
470
471
return task;
472
}
473
EXPORT_SYMBOL_GPL(rpc_wake_up_next);
474
475
/**
476
* rpc_wake_up - wake up all rpc_tasks
477
* @queue: rpc_wait_queue on which the tasks are sleeping
478
*
479
* Grabs queue->lock
480
*/
481
void rpc_wake_up(struct rpc_wait_queue *queue)
482
{
483
struct rpc_task *task, *next;
484
struct list_head *head;
485
486
spin_lock_bh(&queue->lock);
487
head = &queue->tasks[queue->maxpriority];
488
for (;;) {
489
list_for_each_entry_safe(task, next, head, u.tk_wait.list)
490
rpc_wake_up_task_queue_locked(queue, task);
491
if (head == &queue->tasks[0])
492
break;
493
head--;
494
}
495
spin_unlock_bh(&queue->lock);
496
}
497
EXPORT_SYMBOL_GPL(rpc_wake_up);
498
499
/**
500
* rpc_wake_up_status - wake up all rpc_tasks and set their status value.
501
* @queue: rpc_wait_queue on which the tasks are sleeping
502
* @status: status value to set
503
*
504
* Grabs queue->lock
505
*/
506
void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
507
{
508
struct rpc_task *task, *next;
509
struct list_head *head;
510
511
spin_lock_bh(&queue->lock);
512
head = &queue->tasks[queue->maxpriority];
513
for (;;) {
514
list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
515
task->tk_status = status;
516
rpc_wake_up_task_queue_locked(queue, task);
517
}
518
if (head == &queue->tasks[0])
519
break;
520
head--;
521
}
522
spin_unlock_bh(&queue->lock);
523
}
524
EXPORT_SYMBOL_GPL(rpc_wake_up_status);
525
526
static void __rpc_queue_timer_fn(unsigned long ptr)
527
{
528
struct rpc_wait_queue *queue = (struct rpc_wait_queue *)ptr;
529
struct rpc_task *task, *n;
530
unsigned long expires, now, timeo;
531
532
spin_lock(&queue->lock);
533
expires = now = jiffies;
534
list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
535
timeo = task->u.tk_wait.expires;
536
if (time_after_eq(now, timeo)) {
537
dprintk("RPC: %5u timeout\n", task->tk_pid);
538
task->tk_status = -ETIMEDOUT;
539
rpc_wake_up_task_queue_locked(queue, task);
540
continue;
541
}
542
if (expires == now || time_after(expires, timeo))
543
expires = timeo;
544
}
545
if (!list_empty(&queue->timer_list.list))
546
rpc_set_queue_timer(queue, expires);
547
spin_unlock(&queue->lock);
548
}
549
550
static void __rpc_atrun(struct rpc_task *task)
551
{
552
task->tk_status = 0;
553
}
554
555
/*
556
* Run a task at a later time
557
*/
558
void rpc_delay(struct rpc_task *task, unsigned long delay)
559
{
560
task->tk_timeout = delay;
561
rpc_sleep_on(&delay_queue, task, __rpc_atrun);
562
}
563
EXPORT_SYMBOL_GPL(rpc_delay);
564
565
/*
566
* Helper to call task->tk_ops->rpc_call_prepare
567
*/
568
void rpc_prepare_task(struct rpc_task *task)
569
{
570
task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
571
}
572
573
/*
574
* Helper that calls task->tk_ops->rpc_call_done if it exists
575
*/
576
void rpc_exit_task(struct rpc_task *task)
577
{
578
task->tk_action = NULL;
579
if (task->tk_ops->rpc_call_done != NULL) {
580
task->tk_ops->rpc_call_done(task, task->tk_calldata);
581
if (task->tk_action != NULL) {
582
WARN_ON(RPC_ASSASSINATED(task));
583
/* Always release the RPC slot and buffer memory */
584
xprt_release(task);
585
}
586
}
587
}
588
589
void rpc_exit(struct rpc_task *task, int status)
590
{
591
task->tk_status = status;
592
task->tk_action = rpc_exit_task;
593
if (RPC_IS_QUEUED(task))
594
rpc_wake_up_queued_task(task->tk_waitqueue, task);
595
}
596
EXPORT_SYMBOL_GPL(rpc_exit);
597
598
void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
599
{
600
if (ops->rpc_release != NULL)
601
ops->rpc_release(calldata);
602
}
603
604
/*
605
* This is the RPC `scheduler' (or rather, the finite state machine).
606
*/
607
static void __rpc_execute(struct rpc_task *task)
608
{
609
struct rpc_wait_queue *queue;
610
int task_is_async = RPC_IS_ASYNC(task);
611
int status = 0;
612
613
dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
614
task->tk_pid, task->tk_flags);
615
616
BUG_ON(RPC_IS_QUEUED(task));
617
618
for (;;) {
619
void (*do_action)(struct rpc_task *);
620
621
/*
622
* Execute any pending callback first.
623
*/
624
do_action = task->tk_callback;
625
task->tk_callback = NULL;
626
if (do_action == NULL) {
627
/*
628
* Perform the next FSM step.
629
* tk_action may be NULL if the task has been killed.
630
* In particular, note that rpc_killall_tasks may
631
* do this at any time, so beware when dereferencing.
632
*/
633
do_action = task->tk_action;
634
if (do_action == NULL)
635
break;
636
}
637
do_action(task);
638
639
/*
640
* Lockless check for whether task is sleeping or not.
641
*/
642
if (!RPC_IS_QUEUED(task))
643
continue;
644
/*
645
* The queue->lock protects against races with
646
* rpc_make_runnable().
647
*
648
* Note that once we clear RPC_TASK_RUNNING on an asynchronous
649
* rpc_task, rpc_make_runnable() can assign it to a
650
* different workqueue. We therefore cannot assume that the
651
* rpc_task pointer may still be dereferenced.
652
*/
653
queue = task->tk_waitqueue;
654
spin_lock_bh(&queue->lock);
655
if (!RPC_IS_QUEUED(task)) {
656
spin_unlock_bh(&queue->lock);
657
continue;
658
}
659
rpc_clear_running(task);
660
spin_unlock_bh(&queue->lock);
661
if (task_is_async)
662
return;
663
664
/* sync task: sleep here */
665
dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
666
status = out_of_line_wait_on_bit(&task->tk_runstate,
667
RPC_TASK_QUEUED, rpc_wait_bit_killable,
668
TASK_KILLABLE);
669
if (status == -ERESTARTSYS) {
670
/*
671
* When a sync task receives a signal, it exits with
672
* -ERESTARTSYS. In order to catch any callbacks that
673
* clean up after sleeping on some queue, we don't
674
* break the loop here, but go around once more.
675
*/
676
dprintk("RPC: %5u got signal\n", task->tk_pid);
677
task->tk_flags |= RPC_TASK_KILLED;
678
rpc_exit(task, -ERESTARTSYS);
679
}
680
rpc_set_running(task);
681
dprintk("RPC: %5u sync task resuming\n", task->tk_pid);
682
}
683
684
dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status,
685
task->tk_status);
686
/* Release all resources associated with the task */
687
rpc_release_task(task);
688
}
689
690
/*
691
* User-visible entry point to the scheduler.
692
*
693
* This may be called recursively if e.g. an async NFS task updates
694
* the attributes and finds that dirty pages must be flushed.
695
* NOTE: Upon exit of this function the task is guaranteed to be
696
* released. In particular note that tk_release() will have
697
* been called, so your task memory may have been freed.
698
*/
699
void rpc_execute(struct rpc_task *task)
700
{
701
rpc_set_active(task);
702
rpc_make_runnable(task);
703
if (!RPC_IS_ASYNC(task))
704
__rpc_execute(task);
705
}
706
707
static void rpc_async_schedule(struct work_struct *work)
708
{
709
__rpc_execute(container_of(work, struct rpc_task, u.tk_work));
710
}
711
712
/**
713
* rpc_malloc - allocate an RPC buffer
714
* @task: RPC task that will use this buffer
715
* @size: requested byte size
716
*
717
* To prevent rpciod from hanging, this allocator never sleeps,
718
* returning NULL if the request cannot be serviced immediately.
719
* The caller can arrange to sleep in a way that is safe for rpciod.
720
*
721
* Most requests are 'small' (under 2KiB) and can be serviced from a
722
* mempool, ensuring that NFS reads and writes can always proceed,
723
* and that there is good locality of reference for these buffers.
724
*
725
* In order to avoid memory starvation triggering more writebacks of
726
* NFS requests, we avoid using GFP_KERNEL.
727
*/
728
void *rpc_malloc(struct rpc_task *task, size_t size)
729
{
730
struct rpc_buffer *buf;
731
gfp_t gfp = RPC_IS_SWAPPER(task) ? GFP_ATOMIC : GFP_NOWAIT;
732
733
size += sizeof(struct rpc_buffer);
734
if (size <= RPC_BUFFER_MAXSIZE)
735
buf = mempool_alloc(rpc_buffer_mempool, gfp);
736
else
737
buf = kmalloc(size, gfp);
738
739
if (!buf)
740
return NULL;
741
742
buf->len = size;
743
dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
744
task->tk_pid, size, buf);
745
return &buf->data;
746
}
747
EXPORT_SYMBOL_GPL(rpc_malloc);
748
749
/**
750
* rpc_free - free buffer allocated via rpc_malloc
751
* @buffer: buffer to free
752
*
753
*/
754
void rpc_free(void *buffer)
755
{
756
size_t size;
757
struct rpc_buffer *buf;
758
759
if (!buffer)
760
return;
761
762
buf = container_of(buffer, struct rpc_buffer, data);
763
size = buf->len;
764
765
dprintk("RPC: freeing buffer of size %zu at %p\n",
766
size, buf);
767
768
if (size <= RPC_BUFFER_MAXSIZE)
769
mempool_free(buf, rpc_buffer_mempool);
770
else
771
kfree(buf);
772
}
773
EXPORT_SYMBOL_GPL(rpc_free);
774
775
/*
776
* Creation and deletion of RPC task structures
777
*/
778
static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
779
{
780
memset(task, 0, sizeof(*task));
781
atomic_set(&task->tk_count, 1);
782
task->tk_flags = task_setup_data->flags;
783
task->tk_ops = task_setup_data->callback_ops;
784
task->tk_calldata = task_setup_data->callback_data;
785
INIT_LIST_HEAD(&task->tk_task);
786
787
/* Initialize retry counters */
788
task->tk_garb_retry = 2;
789
task->tk_cred_retry = 2;
790
task->tk_rebind_retry = 2;
791
792
task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
793
task->tk_owner = current->tgid;
794
795
/* Initialize workqueue for async tasks */
796
task->tk_workqueue = task_setup_data->workqueue;
797
798
if (task->tk_ops->rpc_call_prepare != NULL)
799
task->tk_action = rpc_prepare_task;
800
801
/* starting timestamp */
802
task->tk_start = ktime_get();
803
804
dprintk("RPC: new task initialized, procpid %u\n",
805
task_pid_nr(current));
806
}
807
808
static struct rpc_task *
809
rpc_alloc_task(void)
810
{
811
return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
812
}
813
814
/*
815
* Create a new task for the specified client.
816
*/
817
struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
818
{
819
struct rpc_task *task = setup_data->task;
820
unsigned short flags = 0;
821
822
if (task == NULL) {
823
task = rpc_alloc_task();
824
if (task == NULL) {
825
rpc_release_calldata(setup_data->callback_ops,
826
setup_data->callback_data);
827
return ERR_PTR(-ENOMEM);
828
}
829
flags = RPC_TASK_DYNAMIC;
830
}
831
832
rpc_init_task(task, setup_data);
833
task->tk_flags |= flags;
834
dprintk("RPC: allocated task %p\n", task);
835
return task;
836
}
837
838
static void rpc_free_task(struct rpc_task *task)
839
{
840
const struct rpc_call_ops *tk_ops = task->tk_ops;
841
void *calldata = task->tk_calldata;
842
843
if (task->tk_flags & RPC_TASK_DYNAMIC) {
844
dprintk("RPC: %5u freeing task\n", task->tk_pid);
845
mempool_free(task, rpc_task_mempool);
846
}
847
rpc_release_calldata(tk_ops, calldata);
848
}
849
850
static void rpc_async_release(struct work_struct *work)
851
{
852
rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
853
}
854
855
static void rpc_release_resources_task(struct rpc_task *task)
856
{
857
if (task->tk_rqstp)
858
xprt_release(task);
859
if (task->tk_msg.rpc_cred) {
860
put_rpccred(task->tk_msg.rpc_cred);
861
task->tk_msg.rpc_cred = NULL;
862
}
863
rpc_task_release_client(task);
864
}
865
866
static void rpc_final_put_task(struct rpc_task *task,
867
struct workqueue_struct *q)
868
{
869
if (q != NULL) {
870
INIT_WORK(&task->u.tk_work, rpc_async_release);
871
queue_work(q, &task->u.tk_work);
872
} else
873
rpc_free_task(task);
874
}
875
876
static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
877
{
878
if (atomic_dec_and_test(&task->tk_count)) {
879
rpc_release_resources_task(task);
880
rpc_final_put_task(task, q);
881
}
882
}
883
884
void rpc_put_task(struct rpc_task *task)
885
{
886
rpc_do_put_task(task, NULL);
887
}
888
EXPORT_SYMBOL_GPL(rpc_put_task);
889
890
void rpc_put_task_async(struct rpc_task *task)
891
{
892
rpc_do_put_task(task, task->tk_workqueue);
893
}
894
EXPORT_SYMBOL_GPL(rpc_put_task_async);
895
896
static void rpc_release_task(struct rpc_task *task)
897
{
898
dprintk("RPC: %5u release task\n", task->tk_pid);
899
900
BUG_ON (RPC_IS_QUEUED(task));
901
902
rpc_release_resources_task(task);
903
904
/*
905
* Note: at this point we have been removed from rpc_clnt->cl_tasks,
906
* so it should be safe to use task->tk_count as a test for whether
907
* or not any other processes still hold references to our rpc_task.
908
*/
909
if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
910
/* Wake up anyone who may be waiting for task completion */
911
if (!rpc_complete_task(task))
912
return;
913
} else {
914
if (!atomic_dec_and_test(&task->tk_count))
915
return;
916
}
917
rpc_final_put_task(task, task->tk_workqueue);
918
}
919
920
int rpciod_up(void)
921
{
922
return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
923
}
924
925
void rpciod_down(void)
926
{
927
module_put(THIS_MODULE);
928
}
929
930
/*
931
* Start up the rpciod workqueue.
932
*/
933
static int rpciod_start(void)
934
{
935
struct workqueue_struct *wq;
936
937
/*
938
* Create the rpciod thread and wait for it to start.
939
*/
940
dprintk("RPC: creating workqueue rpciod\n");
941
wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0);
942
rpciod_workqueue = wq;
943
return rpciod_workqueue != NULL;
944
}
945
946
static void rpciod_stop(void)
947
{
948
struct workqueue_struct *wq = NULL;
949
950
if (rpciod_workqueue == NULL)
951
return;
952
dprintk("RPC: destroying workqueue rpciod\n");
953
954
wq = rpciod_workqueue;
955
rpciod_workqueue = NULL;
956
destroy_workqueue(wq);
957
}
958
959
void
960
rpc_destroy_mempool(void)
961
{
962
rpciod_stop();
963
if (rpc_buffer_mempool)
964
mempool_destroy(rpc_buffer_mempool);
965
if (rpc_task_mempool)
966
mempool_destroy(rpc_task_mempool);
967
if (rpc_task_slabp)
968
kmem_cache_destroy(rpc_task_slabp);
969
if (rpc_buffer_slabp)
970
kmem_cache_destroy(rpc_buffer_slabp);
971
rpc_destroy_wait_queue(&delay_queue);
972
}
973
974
int
975
rpc_init_mempool(void)
976
{
977
/*
978
* The following is not strictly a mempool initialisation,
979
* but there is no harm in doing it here
980
*/
981
rpc_init_wait_queue(&delay_queue, "delayq");
982
if (!rpciod_start())
983
goto err_nomem;
984
985
rpc_task_slabp = kmem_cache_create("rpc_tasks",
986
sizeof(struct rpc_task),
987
0, SLAB_HWCACHE_ALIGN,
988
NULL);
989
if (!rpc_task_slabp)
990
goto err_nomem;
991
rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
992
RPC_BUFFER_MAXSIZE,
993
0, SLAB_HWCACHE_ALIGN,
994
NULL);
995
if (!rpc_buffer_slabp)
996
goto err_nomem;
997
rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
998
rpc_task_slabp);
999
if (!rpc_task_mempool)
1000
goto err_nomem;
1001
rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
1002
rpc_buffer_slabp);
1003
if (!rpc_buffer_mempool)
1004
goto err_nomem;
1005
return 0;
1006
err_nomem:
1007
rpc_destroy_mempool();
1008
return -ENOMEM;
1009
}
1010
1011