Path: blob/main/sys/contrib/openzfs/module/os/linux/spl/spl-taskq.c
48775 views
// SPDX-License-Identifier: GPL-2.0-or-later1/*2* Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC.3* Copyright (C) 2007 The Regents of the University of California.4* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).5* Written by Brian Behlendorf <[email protected]>.6* UCRL-CODE-2351977*8* This file is part of the SPL, Solaris Porting Layer.9*10* The SPL is free software; you can redistribute it and/or modify it11* under the terms of the GNU General Public License as published by the12* Free Software Foundation; either version 2 of the License, or (at your13* option) any later version.14*15* The SPL is distributed in the hope that it will be useful, but WITHOUT16* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or17* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License18* for more details.19*20* You should have received a copy of the GNU General Public License along21* with the SPL. If not, see <http://www.gnu.org/licenses/>.22*23* Solaris Porting Layer (SPL) Task Queue Implementation.24*/25/*26* Copyright (c) 2024, Klara Inc.27* Copyright (c) 2024, Syneto28*/2930#include <sys/timer.h>31#include <sys/taskq.h>32#include <sys/kmem.h>33#include <sys/tsd.h>34#include <sys/trace_spl.h>35#include <sys/time.h>36#include <sys/atomic.h>37#include <sys/kstat.h>38#include <linux/cpuhotplug.h>39#include <linux/mod_compat.h>4041/* Linux 6.2 renamed timer_delete_sync(); point it at its old name for those. */42#ifndef HAVE_TIMER_DELETE_SYNC43#define timer_delete_sync(t) del_timer_sync(t)44#endif4546typedef struct taskq_kstats {47/* static values, for completeness */48kstat_named_t tqks_threads_max;49kstat_named_t tqks_entry_pool_min;50kstat_named_t tqks_entry_pool_max;5152/* gauges (inc/dec counters, current value) */53kstat_named_t tqks_threads_active;54kstat_named_t tqks_threads_idle;55kstat_named_t tqks_threads_total;56kstat_named_t tqks_tasks_pending;57kstat_named_t tqks_tasks_priority;58kstat_named_t tqks_tasks_total;59kstat_named_t tqks_tasks_delayed;60kstat_named_t tqks_entries_free;6162/* counters (inc only, since taskq creation) */63kstat_named_t tqks_threads_created;64kstat_named_t tqks_threads_destroyed;65kstat_named_t tqks_tasks_dispatched;66kstat_named_t tqks_tasks_dispatched_delayed;67kstat_named_t tqks_tasks_executed_normal;68kstat_named_t tqks_tasks_executed_priority;69kstat_named_t tqks_tasks_executed;70kstat_named_t tqks_tasks_delayed_requeued;71kstat_named_t tqks_tasks_cancelled;72kstat_named_t tqks_thread_wakeups;73kstat_named_t tqks_thread_wakeups_nowork;74kstat_named_t tqks_thread_sleeps;75} taskq_kstats_t;7677static taskq_kstats_t taskq_kstats_template = {78{ "threads_max", KSTAT_DATA_UINT64 },79{ "entry_pool_min", KSTAT_DATA_UINT64 },80{ "entry_pool_max", KSTAT_DATA_UINT64 },81{ "threads_active", KSTAT_DATA_UINT64 },82{ "threads_idle", KSTAT_DATA_UINT64 },83{ "threads_total", KSTAT_DATA_UINT64 },84{ "tasks_pending", KSTAT_DATA_UINT64 },85{ "tasks_priority", KSTAT_DATA_UINT64 },86{ "tasks_total", KSTAT_DATA_UINT64 },87{ "tasks_delayed", KSTAT_DATA_UINT64 },88{ "entries_free", KSTAT_DATA_UINT64 },8990{ "threads_created", KSTAT_DATA_UINT64 },91{ "threads_destroyed", KSTAT_DATA_UINT64 },92{ "tasks_dispatched", KSTAT_DATA_UINT64 },93{ "tasks_dispatched_delayed", KSTAT_DATA_UINT64 },94{ "tasks_executed_normal", KSTAT_DATA_UINT64 },95{ "tasks_executed_priority", KSTAT_DATA_UINT64 },96{ "tasks_executed", KSTAT_DATA_UINT64 },97{ "tasks_delayed_requeued", KSTAT_DATA_UINT64 },98{ "tasks_cancelled", KSTAT_DATA_UINT64 },99{ "thread_wakeups", KSTAT_DATA_UINT64 },100{ "thread_wakeups_nowork", KSTAT_DATA_UINT64 },101{ "thread_sleeps", KSTAT_DATA_UINT64 },102};103104#define TQSTAT_INC(tq, stat) wmsum_add(&tq->tq_sums.tqs_##stat, 1)105#define TQSTAT_DEC(tq, stat) wmsum_add(&tq->tq_sums.tqs_##stat, -1)106107#define _TQSTAT_MOD_LIST(mod, tq, t) do { \108switch (t->tqent_flags & TQENT_LIST_MASK) { \109case TQENT_LIST_NONE: ASSERT(list_empty(&t->tqent_list)); break;\110case TQENT_LIST_PENDING: mod(tq, tasks_pending); break; \111case TQENT_LIST_PRIORITY: mod(tq, tasks_priority); break; \112case TQENT_LIST_DELAY: mod(tq, tasks_delayed); break; \113} \114} while (0)115#define TQSTAT_INC_LIST(tq, t) _TQSTAT_MOD_LIST(TQSTAT_INC, tq, t)116#define TQSTAT_DEC_LIST(tq, t) _TQSTAT_MOD_LIST(TQSTAT_DEC, tq, t)117118#define TQENT_SET_LIST(t, l) \119t->tqent_flags = (t->tqent_flags & ~TQENT_LIST_MASK) | l;120121static int spl_taskq_thread_bind = 0;122module_param(spl_taskq_thread_bind, int, 0644);123MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default");124125static uint_t spl_taskq_thread_timeout_ms = 5000;126module_param(spl_taskq_thread_timeout_ms, uint, 0644);127MODULE_PARM_DESC(spl_taskq_thread_timeout_ms,128"Minimum idle threads exit interval for dynamic taskqs");129130static int spl_taskq_thread_dynamic = 1;131module_param(spl_taskq_thread_dynamic, int, 0444);132MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads");133134static int spl_taskq_thread_priority = 1;135module_param(spl_taskq_thread_priority, int, 0644);136MODULE_PARM_DESC(spl_taskq_thread_priority,137"Allow non-default priority for taskq threads");138139static uint_t spl_taskq_thread_sequential = 4;140module_param(spl_taskq_thread_sequential, uint, 0644);141MODULE_PARM_DESC(spl_taskq_thread_sequential,142"Create new taskq threads after N sequential tasks");143144/*145* Global system-wide dynamic task queue available for all consumers. This146* taskq is not intended for long-running tasks; instead, a dedicated taskq147* should be created.148*/149taskq_t *system_taskq;150EXPORT_SYMBOL(system_taskq);151/* Global dynamic task queue for long delay */152taskq_t *system_delay_taskq;153EXPORT_SYMBOL(system_delay_taskq);154155/* Private dedicated taskq for creating new taskq threads on demand. */156static taskq_t *dynamic_taskq;157static taskq_thread_t *taskq_thread_create(taskq_t *);158159/* Multi-callback id for cpu hotplugging. */160static int spl_taskq_cpuhp_state;161162/* List of all taskqs */163LIST_HEAD(tq_list);164struct rw_semaphore tq_list_sem;165static uint_t taskq_tsd;166167static int168task_km_flags(uint_t flags)169{170if (flags & TQ_NOSLEEP)171return (KM_NOSLEEP);172173if (flags & TQ_PUSHPAGE)174return (KM_PUSHPAGE);175176return (KM_SLEEP);177}178179/*180* taskq_find_by_name - Find the largest instance number of a named taskq.181*/182static int183taskq_find_by_name(const char *name)184{185struct list_head *tql = NULL;186taskq_t *tq;187188list_for_each_prev(tql, &tq_list) {189tq = list_entry(tql, taskq_t, tq_taskqs);190if (strcmp(name, tq->tq_name) == 0)191return (tq->tq_instance);192}193return (-1);194}195196/*197* NOTE: Must be called with tq->tq_lock held, returns a list_t which198* is not attached to the free, work, or pending taskq lists.199*/200static taskq_ent_t *201task_alloc(taskq_t *tq, uint_t flags, unsigned long *irqflags)202{203taskq_ent_t *t;204int count = 0;205206ASSERT(tq);207retry:208/* Acquire taskq_ent_t's from free list if available */209if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {210t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);211212ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));213ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL));214ASSERT(!timer_pending(&t->tqent_timer));215216list_del_init(&t->tqent_list);217TQSTAT_DEC(tq, entries_free);218return (t);219}220221/* Free list is empty and memory allocations are prohibited */222if (flags & TQ_NOALLOC)223return (NULL);224225/* Hit maximum taskq_ent_t pool size */226if (tq->tq_nalloc >= tq->tq_maxalloc) {227if (flags & TQ_NOSLEEP)228return (NULL);229230/*231* Sleep periodically polling the free list for an available232* taskq_ent_t. Dispatching with TQ_SLEEP should always succeed233* but we cannot block forever waiting for an taskq_ent_t to234* show up in the free list, otherwise a deadlock can happen.235*236* Therefore, we need to allocate a new task even if the number237* of allocated tasks is above tq->tq_maxalloc, but we still238* end up delaying the task allocation by one second, thereby239* throttling the task dispatch rate.240*/241spin_unlock_irqrestore(&tq->tq_lock, *irqflags);242schedule_timeout_interruptible(HZ / 100);243spin_lock_irqsave_nested(&tq->tq_lock, *irqflags,244tq->tq_lock_class);245if (count < 100) {246count++;247goto retry;248}249}250251spin_unlock_irqrestore(&tq->tq_lock, *irqflags);252t = kmem_alloc(sizeof (taskq_ent_t), task_km_flags(flags));253spin_lock_irqsave_nested(&tq->tq_lock, *irqflags, tq->tq_lock_class);254255if (t) {256taskq_init_ent(t);257tq->tq_nalloc++;258}259260return (t);261}262263/*264* NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t265* to already be removed from the free, work, or pending taskq lists.266*/267static void268task_free(taskq_t *tq, taskq_ent_t *t)269{270ASSERT(tq);271ASSERT(t);272ASSERT(list_empty(&t->tqent_list));273ASSERT(!timer_pending(&t->tqent_timer));274275kmem_free(t, sizeof (taskq_ent_t));276tq->tq_nalloc--;277}278279/*280* NOTE: Must be called with tq->tq_lock held, either destroys the281* taskq_ent_t if too many exist or moves it to the free list for later use.282*/283static void284task_done(taskq_t *tq, taskq_ent_t *t)285{286ASSERT(tq);287ASSERT(t);288ASSERT(list_empty(&t->tqent_list));289290/* Wake tasks blocked in taskq_wait_id() */291wake_up_all(&t->tqent_waitq);292293if (tq->tq_nalloc <= tq->tq_minalloc) {294t->tqent_id = TASKQID_INVALID;295t->tqent_func = NULL;296t->tqent_arg = NULL;297t->tqent_flags = 0;298299list_add_tail(&t->tqent_list, &tq->tq_free_list);300TQSTAT_INC(tq, entries_free);301} else {302task_free(tq, t);303}304}305306/*307* When a delayed task timer expires remove it from the delay list and308* add it to the priority list in order for immediate processing.309*/310static void311task_expire_impl(taskq_ent_t *t)312{313taskq_ent_t *w;314taskq_t *tq = t->tqent_taskq;315struct list_head *l = NULL;316unsigned long flags;317318spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);319320if (t->tqent_flags & TQENT_FLAG_CANCEL) {321ASSERT(list_empty(&t->tqent_list));322spin_unlock_irqrestore(&tq->tq_lock, flags);323return;324}325326t->tqent_birth = jiffies;327DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);328329/*330* The priority list must be maintained in strict task id order331* from lowest to highest for lowest_id to be easily calculable.332*/333list_del(&t->tqent_list);334list_for_each_prev(l, &tq->tq_prio_list) {335w = list_entry(l, taskq_ent_t, tqent_list);336if (w->tqent_id < t->tqent_id) {337list_add(&t->tqent_list, l);338break;339}340}341if (l == &tq->tq_prio_list)342list_add(&t->tqent_list, &tq->tq_prio_list);343344spin_unlock_irqrestore(&tq->tq_lock, flags);345346wake_up(&tq->tq_work_waitq);347348TQSTAT_INC(tq, tasks_delayed_requeued);349}350351static void352task_expire(struct timer_list *tl)353{354struct timer_list *tmr = (struct timer_list *)tl;355taskq_ent_t *t = from_timer(t, tmr, tqent_timer);356task_expire_impl(t);357}358359/*360* Returns the lowest incomplete taskqid_t. The taskqid_t may361* be queued on the pending list, on the priority list, on the362* delay list, or on the work list currently being handled, but363* it is not 100% complete yet.364*/365static taskqid_t366taskq_lowest_id(taskq_t *tq)367{368taskqid_t lowest_id = tq->tq_next_id;369taskq_ent_t *t;370taskq_thread_t *tqt;371372if (!list_empty(&tq->tq_pend_list)) {373t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);374lowest_id = MIN(lowest_id, t->tqent_id);375}376377if (!list_empty(&tq->tq_prio_list)) {378t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);379lowest_id = MIN(lowest_id, t->tqent_id);380}381382if (!list_empty(&tq->tq_delay_list)) {383t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list);384lowest_id = MIN(lowest_id, t->tqent_id);385}386387if (!list_empty(&tq->tq_active_list)) {388tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,389tqt_active_list);390ASSERT(tqt->tqt_id != TASKQID_INVALID);391lowest_id = MIN(lowest_id, tqt->tqt_id);392}393394return (lowest_id);395}396397/*398* Insert a task into a list keeping the list sorted by increasing taskqid.399*/400static void401taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)402{403taskq_thread_t *w;404struct list_head *l = NULL;405406ASSERT(tq);407ASSERT(tqt);408409list_for_each_prev(l, &tq->tq_active_list) {410w = list_entry(l, taskq_thread_t, tqt_active_list);411if (w->tqt_id < tqt->tqt_id) {412list_add(&tqt->tqt_active_list, l);413break;414}415}416if (l == &tq->tq_active_list)417list_add(&tqt->tqt_active_list, &tq->tq_active_list);418}419420/*421* Find and return a task from the given list if it exists. The list422* must be in lowest to highest task id order.423*/424static taskq_ent_t *425taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id)426{427struct list_head *l = NULL;428taskq_ent_t *t;429430list_for_each(l, lh) {431t = list_entry(l, taskq_ent_t, tqent_list);432433if (t->tqent_id == id)434return (t);435436if (t->tqent_id > id)437break;438}439440return (NULL);441}442443/*444* Find an already dispatched task given the task id regardless of what445* state it is in. If a task is still pending it will be returned.446* If a task is executing, then -EBUSY will be returned instead.447* If the task has already been run then NULL is returned.448*/449static taskq_ent_t *450taskq_find(taskq_t *tq, taskqid_t id)451{452taskq_thread_t *tqt;453struct list_head *l = NULL;454taskq_ent_t *t;455456t = taskq_find_list(tq, &tq->tq_delay_list, id);457if (t)458return (t);459460t = taskq_find_list(tq, &tq->tq_prio_list, id);461if (t)462return (t);463464t = taskq_find_list(tq, &tq->tq_pend_list, id);465if (t)466return (t);467468list_for_each(l, &tq->tq_active_list) {469tqt = list_entry(l, taskq_thread_t, tqt_active_list);470if (tqt->tqt_id == id) {471/*472* Instead of returning tqt_task, we just return a non473* NULL value to prevent misuse, since tqt_task only474* has two valid fields.475*/476return (ERR_PTR(-EBUSY));477}478}479480return (NULL);481}482483/*484* Theory for the taskq_wait_id(), taskq_wait_outstanding(), and485* taskq_wait() functions below.486*487* Taskq waiting is accomplished by tracking the lowest outstanding task488* id and the next available task id. As tasks are dispatched they are489* added to the tail of the pending, priority, or delay lists. As worker490* threads become available the tasks are removed from the heads of these491* lists and linked to the worker threads. This ensures the lists are492* kept sorted by lowest to highest task id.493*494* Therefore the lowest outstanding task id can be quickly determined by495* checking the head item from all of these lists. This value is stored496* with the taskq as the lowest id. It only needs to be recalculated when497* either the task with the current lowest id completes or is canceled.498*499* By blocking until the lowest task id exceeds the passed task id the500* taskq_wait_outstanding() function can be easily implemented. Similarly,501* by blocking until the lowest task id matches the next task id taskq_wait()502* can be implemented.503*504* Callers should be aware that when there are multiple worked threads it505* is possible for larger task ids to complete before smaller ones. Also506* when the taskq contains delay tasks with small task ids callers may507* block for a considerable length of time waiting for them to expire and508* execute.509*/510static int511taskq_wait_id_check(taskq_t *tq, taskqid_t id)512{513int rc;514unsigned long flags;515516spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);517rc = (taskq_find(tq, id) == NULL);518spin_unlock_irqrestore(&tq->tq_lock, flags);519520return (rc);521}522523/*524* The taskq_wait_id() function blocks until the passed task id completes.525* This does not guarantee that all lower task ids have completed.526*/527void528taskq_wait_id(taskq_t *tq, taskqid_t id)529{530wait_event(tq->tq_wait_waitq, taskq_wait_id_check(tq, id));531}532EXPORT_SYMBOL(taskq_wait_id);533534static int535taskq_wait_outstanding_check(taskq_t *tq, taskqid_t id)536{537int rc;538unsigned long flags;539540spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);541rc = (id < tq->tq_lowest_id);542spin_unlock_irqrestore(&tq->tq_lock, flags);543544return (rc);545}546547/*548* The taskq_wait_outstanding() function will block until all tasks with a549* lower taskqid than the passed 'id' have been completed. Note that all550* task id's are assigned monotonically at dispatch time. Zero may be551* passed for the id to indicate all tasks dispatch up to this point,552* but not after, should be waited for.553*/554void555taskq_wait_outstanding(taskq_t *tq, taskqid_t id)556{557id = id ? id : tq->tq_next_id - 1;558wait_event(tq->tq_wait_waitq, taskq_wait_outstanding_check(tq, id));559}560EXPORT_SYMBOL(taskq_wait_outstanding);561562static int563taskq_wait_check(taskq_t *tq)564{565int rc;566unsigned long flags;567568spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);569rc = (tq->tq_lowest_id == tq->tq_next_id);570spin_unlock_irqrestore(&tq->tq_lock, flags);571572return (rc);573}574575/*576* The taskq_wait() function will block until the taskq is empty.577* This means that if a taskq re-dispatches work to itself taskq_wait()578* callers will block indefinitely.579*/580void581taskq_wait(taskq_t *tq)582{583wait_event(tq->tq_wait_waitq, taskq_wait_check(tq));584}585EXPORT_SYMBOL(taskq_wait);586587int588taskq_member(taskq_t *tq, kthread_t *t)589{590return (tq == (taskq_t *)tsd_get_by_thread(taskq_tsd, t));591}592EXPORT_SYMBOL(taskq_member);593594taskq_t *595taskq_of_curthread(void)596{597return (tsd_get(taskq_tsd));598}599EXPORT_SYMBOL(taskq_of_curthread);600601/*602* Cancel an already dispatched task given the task id. Still pending tasks603* will be immediately canceled, and if the task is active the function will604* block until it completes. Preallocated tasks which are canceled must be605* freed by the caller.606*/607int608taskq_cancel_id(taskq_t *tq, taskqid_t id)609{610taskq_ent_t *t;611int rc = ENOENT;612unsigned long flags;613614ASSERT(tq);615616spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);617t = taskq_find(tq, id);618if (t && t != ERR_PTR(-EBUSY)) {619list_del_init(&t->tqent_list);620TQSTAT_DEC_LIST(tq, t);621TQSTAT_DEC(tq, tasks_total);622623t->tqent_flags |= TQENT_FLAG_CANCEL;624TQSTAT_INC(tq, tasks_cancelled);625626/*627* When canceling the lowest outstanding task id we628* must recalculate the new lowest outstanding id.629*/630if (tq->tq_lowest_id == t->tqent_id) {631tq->tq_lowest_id = taskq_lowest_id(tq);632ASSERT3S(tq->tq_lowest_id, >, t->tqent_id);633}634635/*636* The task_expire() function takes the tq->tq_lock so drop637* drop the lock before synchronously cancelling the timer.638*/639if (timer_pending(&t->tqent_timer)) {640spin_unlock_irqrestore(&tq->tq_lock, flags);641timer_delete_sync(&t->tqent_timer);642spin_lock_irqsave_nested(&tq->tq_lock, flags,643tq->tq_lock_class);644}645646if (!(t->tqent_flags & TQENT_FLAG_PREALLOC))647task_done(tq, t);648649rc = 0;650}651spin_unlock_irqrestore(&tq->tq_lock, flags);652653if (t == ERR_PTR(-EBUSY)) {654taskq_wait_id(tq, id);655rc = EBUSY;656}657658return (rc);659}660EXPORT_SYMBOL(taskq_cancel_id);661662static int taskq_thread_spawn(taskq_t *tq);663664taskqid_t665taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)666{667taskq_ent_t *t;668taskqid_t rc = TASKQID_INVALID;669unsigned long irqflags;670671ASSERT(tq);672ASSERT(func);673674spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);675676/* Taskq being destroyed and all tasks drained */677if (!(tq->tq_flags & TASKQ_ACTIVE))678goto out;679680/* Do not queue the task unless there is idle thread for it */681ASSERT(tq->tq_nactive <= tq->tq_nthreads);682if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {683/* Dynamic taskq may be able to spawn another thread */684if (taskq_thread_spawn(tq) == 0)685goto out;686}687688if ((t = task_alloc(tq, flags, &irqflags)) == NULL)689goto out;690691spin_lock(&t->tqent_lock);692693/* Queue to the front of the list to enforce TQ_NOQUEUE semantics */694if (flags & TQ_NOQUEUE) {695TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);696list_add(&t->tqent_list, &tq->tq_prio_list);697/* Queue to the priority list instead of the pending list */698} else if (flags & TQ_FRONT) {699TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);700list_add_tail(&t->tqent_list, &tq->tq_prio_list);701} else {702TQENT_SET_LIST(t, TQENT_LIST_PENDING);703list_add_tail(&t->tqent_list, &tq->tq_pend_list);704}705TQSTAT_INC_LIST(tq, t);706TQSTAT_INC(tq, tasks_total);707708t->tqent_id = rc = tq->tq_next_id;709tq->tq_next_id++;710t->tqent_func = func;711t->tqent_arg = arg;712t->tqent_taskq = tq;713t->tqent_timer.function = NULL;714t->tqent_timer.expires = 0;715716t->tqent_birth = jiffies;717DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);718719ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));720721spin_unlock(&t->tqent_lock);722723wake_up(&tq->tq_work_waitq);724725TQSTAT_INC(tq, tasks_dispatched);726727/* Spawn additional taskq threads if required. */728if (!(flags & TQ_NOQUEUE) && tq->tq_nactive == tq->tq_nthreads)729(void) taskq_thread_spawn(tq);730out:731spin_unlock_irqrestore(&tq->tq_lock, irqflags);732return (rc);733}734EXPORT_SYMBOL(taskq_dispatch);735736taskqid_t737taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,738uint_t flags, clock_t expire_time)739{740taskqid_t rc = TASKQID_INVALID;741taskq_ent_t *t;742unsigned long irqflags;743744ASSERT(tq);745ASSERT(func);746747spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);748749/* Taskq being destroyed and all tasks drained */750if (!(tq->tq_flags & TASKQ_ACTIVE))751goto out;752753if ((t = task_alloc(tq, flags, &irqflags)) == NULL)754goto out;755756spin_lock(&t->tqent_lock);757758/* Queue to the delay list for subsequent execution */759list_add_tail(&t->tqent_list, &tq->tq_delay_list);760TQENT_SET_LIST(t, TQENT_LIST_DELAY);761TQSTAT_INC_LIST(tq, t);762TQSTAT_INC(tq, tasks_total);763764t->tqent_id = rc = tq->tq_next_id;765tq->tq_next_id++;766t->tqent_func = func;767t->tqent_arg = arg;768t->tqent_taskq = tq;769t->tqent_timer.function = task_expire;770t->tqent_timer.expires = (unsigned long)expire_time;771add_timer(&t->tqent_timer);772773ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));774775spin_unlock(&t->tqent_lock);776777TQSTAT_INC(tq, tasks_dispatched_delayed);778779/* Spawn additional taskq threads if required. */780if (tq->tq_nactive == tq->tq_nthreads)781(void) taskq_thread_spawn(tq);782out:783spin_unlock_irqrestore(&tq->tq_lock, irqflags);784return (rc);785}786EXPORT_SYMBOL(taskq_dispatch_delay);787788void789taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,790taskq_ent_t *t)791{792unsigned long irqflags;793ASSERT(tq);794ASSERT(func);795796spin_lock_irqsave_nested(&tq->tq_lock, irqflags,797tq->tq_lock_class);798799/* Taskq being destroyed and all tasks drained */800if (!(tq->tq_flags & TASKQ_ACTIVE)) {801t->tqent_id = TASKQID_INVALID;802goto out;803}804805if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {806/* Dynamic taskq may be able to spawn another thread */807if (taskq_thread_spawn(tq) == 0)808goto out;809flags |= TQ_FRONT;810}811812spin_lock(&t->tqent_lock);813814/*815* Make sure the entry is not on some other taskq; it is important to816* ASSERT() under lock817*/818ASSERT(taskq_empty_ent(t));819820/*821* Mark it as a prealloc'd task. This is important822* to ensure that we don't free it later.823*/824t->tqent_flags |= TQENT_FLAG_PREALLOC;825826/* Queue to the priority list instead of the pending list */827if (flags & TQ_FRONT) {828TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);829list_add_tail(&t->tqent_list, &tq->tq_prio_list);830} else {831TQENT_SET_LIST(t, TQENT_LIST_PENDING);832list_add_tail(&t->tqent_list, &tq->tq_pend_list);833}834TQSTAT_INC_LIST(tq, t);835TQSTAT_INC(tq, tasks_total);836837t->tqent_id = tq->tq_next_id;838tq->tq_next_id++;839t->tqent_func = func;840t->tqent_arg = arg;841t->tqent_taskq = tq;842843t->tqent_birth = jiffies;844DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t);845846spin_unlock(&t->tqent_lock);847848wake_up(&tq->tq_work_waitq);849850TQSTAT_INC(tq, tasks_dispatched);851852/* Spawn additional taskq threads if required. */853if (tq->tq_nactive == tq->tq_nthreads)854(void) taskq_thread_spawn(tq);855out:856spin_unlock_irqrestore(&tq->tq_lock, irqflags);857}858EXPORT_SYMBOL(taskq_dispatch_ent);859860int861taskq_empty_ent(taskq_ent_t *t)862{863return (list_empty(&t->tqent_list));864}865EXPORT_SYMBOL(taskq_empty_ent);866867void868taskq_init_ent(taskq_ent_t *t)869{870spin_lock_init(&t->tqent_lock);871init_waitqueue_head(&t->tqent_waitq);872timer_setup(&t->tqent_timer, NULL, 0);873INIT_LIST_HEAD(&t->tqent_list);874t->tqent_id = 0;875t->tqent_func = NULL;876t->tqent_arg = NULL;877t->tqent_flags = 0;878t->tqent_taskq = NULL;879}880EXPORT_SYMBOL(taskq_init_ent);881882/*883* Return the next pending task, preference is given to tasks on the884* priority list which were dispatched with TQ_FRONT.885*/886static taskq_ent_t *887taskq_next_ent(taskq_t *tq)888{889struct list_head *list;890891if (!list_empty(&tq->tq_prio_list))892list = &tq->tq_prio_list;893else if (!list_empty(&tq->tq_pend_list))894list = &tq->tq_pend_list;895else896return (NULL);897898return (list_entry(list->next, taskq_ent_t, tqent_list));899}900901/*902* Spawns a new thread for the specified taskq.903*/904static void905taskq_thread_spawn_task(void *arg)906{907taskq_t *tq = (taskq_t *)arg;908unsigned long flags;909910if (taskq_thread_create(tq) == NULL) {911/* restore spawning count if failed */912spin_lock_irqsave_nested(&tq->tq_lock, flags,913tq->tq_lock_class);914tq->tq_nspawn--;915spin_unlock_irqrestore(&tq->tq_lock, flags);916}917}918919/*920* Spawn addition threads for dynamic taskqs (TASKQ_DYNAMIC) the current921* number of threads is insufficient to handle the pending tasks. These922* new threads must be created by the dedicated dynamic_taskq to avoid923* deadlocks between thread creation and memory reclaim. The system_taskq924* which is also a dynamic taskq cannot be safely used for this.925*/926static int927taskq_thread_spawn(taskq_t *tq)928{929int spawning = 0;930931if (!(tq->tq_flags & TASKQ_DYNAMIC))932return (0);933934tq->lastspawnstop = jiffies;935if ((tq->tq_nthreads + tq->tq_nspawn < tq->tq_maxthreads) &&936(tq->tq_flags & TASKQ_ACTIVE)) {937spawning = (++tq->tq_nspawn);938taskq_dispatch(dynamic_taskq, taskq_thread_spawn_task,939tq, TQ_NOSLEEP);940}941942return (spawning);943}944945/*946* Threads in a dynamic taskq may exit once there is no more work to do.947* To prevent threads from being created and destroyed too often limit948* the exit rate to one per spl_taskq_thread_timeout_ms.949*950* The first thread is the thread list is treated as the primary thread.951* There is nothing special about the primary thread but in order to avoid952* all the taskq pids from changing we opt to make it long running.953*/954static int955taskq_thread_should_stop(taskq_t *tq, taskq_thread_t *tqt)956{957ASSERT(!taskq_next_ent(tq));958if (!(tq->tq_flags & TASKQ_DYNAMIC) || !spl_taskq_thread_dynamic)959return (0);960if (!(tq->tq_flags & TASKQ_ACTIVE))961return (1);962if (list_first_entry(&(tq->tq_thread_list), taskq_thread_t,963tqt_thread_list) == tqt)964return (0);965ASSERT3U(tq->tq_nthreads, >, 1);966if (tq->tq_nspawn != 0)967return (0);968if (time_before(jiffies, tq->lastspawnstop +969msecs_to_jiffies(spl_taskq_thread_timeout_ms)))970return (0);971tq->lastspawnstop = jiffies;972return (1);973}974975static int976taskq_thread(void *args)977{978DECLARE_WAITQUEUE(wait, current);979sigset_t blocked;980taskq_thread_t *tqt = args;981taskq_t *tq;982taskq_ent_t *t;983int seq_tasks = 0;984unsigned long flags;985taskq_ent_t dup_task = {};986987ASSERT(tqt);988ASSERT(tqt->tqt_tq);989tq = tqt->tqt_tq;990current->flags |= PF_NOFREEZE;991992(void) spl_fstrans_mark();993994sigfillset(&blocked);995sigprocmask(SIG_BLOCK, &blocked, NULL);996flush_signals(current);997998tsd_set(taskq_tsd, tq);999spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);1000/*1001* If we are dynamically spawned, decrease spawning count. Note that1002* we could be created during taskq_create, in which case we shouldn't1003* do the decrement. But it's fine because taskq_create will reset1004* tq_nspawn later.1005*/1006if (tq->tq_flags & TASKQ_DYNAMIC)1007tq->tq_nspawn--;10081009/* Immediately exit if more threads than allowed were created. */1010if (tq->tq_nthreads >= tq->tq_maxthreads)1011goto error;10121013tq->tq_nthreads++;1014list_add_tail(&tqt->tqt_thread_list, &tq->tq_thread_list);1015wake_up(&tq->tq_wait_waitq);1016set_current_state(TASK_INTERRUPTIBLE);10171018TQSTAT_INC(tq, threads_total);10191020while (!kthread_should_stop()) {10211022if (list_empty(&tq->tq_pend_list) &&1023list_empty(&tq->tq_prio_list)) {10241025if (taskq_thread_should_stop(tq, tqt))1026break;10271028add_wait_queue_exclusive(&tq->tq_work_waitq, &wait);1029spin_unlock_irqrestore(&tq->tq_lock, flags);10301031TQSTAT_INC(tq, thread_sleeps);1032TQSTAT_INC(tq, threads_idle);10331034schedule();1035seq_tasks = 0;10361037TQSTAT_DEC(tq, threads_idle);1038TQSTAT_INC(tq, thread_wakeups);10391040spin_lock_irqsave_nested(&tq->tq_lock, flags,1041tq->tq_lock_class);1042remove_wait_queue(&tq->tq_work_waitq, &wait);1043} else {1044__set_current_state(TASK_RUNNING);1045}10461047if ((t = taskq_next_ent(tq)) != NULL) {1048list_del_init(&t->tqent_list);1049TQSTAT_DEC_LIST(tq, t);1050TQSTAT_DEC(tq, tasks_total);10511052/*1053* A TQENT_FLAG_PREALLOC task may be reused or freed1054* during the task function call. Store tqent_id and1055* tqent_flags here.1056*1057* Also use an on stack taskq_ent_t for tqt_task1058* assignment in this case; we want to make sure1059* to duplicate all fields, so the values are1060* correct when it's accessed via DTRACE_PROBE*.1061*/1062tqt->tqt_id = t->tqent_id;1063tqt->tqt_flags = t->tqent_flags;10641065if (t->tqent_flags & TQENT_FLAG_PREALLOC) {1066dup_task = *t;1067t = &dup_task;1068}1069tqt->tqt_task = t;10701071taskq_insert_in_order(tq, tqt);1072tq->tq_nactive++;1073spin_unlock_irqrestore(&tq->tq_lock, flags);10741075TQSTAT_INC(tq, threads_active);1076DTRACE_PROBE1(taskq_ent__start, taskq_ent_t *, t);10771078/* Perform the requested task */1079t->tqent_func(t->tqent_arg);10801081DTRACE_PROBE1(taskq_ent__finish, taskq_ent_t *, t);10821083TQSTAT_DEC(tq, threads_active);1084if ((t->tqent_flags & TQENT_LIST_MASK) ==1085TQENT_LIST_PENDING)1086TQSTAT_INC(tq, tasks_executed_normal);1087else1088TQSTAT_INC(tq, tasks_executed_priority);1089TQSTAT_INC(tq, tasks_executed);10901091spin_lock_irqsave_nested(&tq->tq_lock, flags,1092tq->tq_lock_class);10931094tq->tq_nactive--;1095list_del_init(&tqt->tqt_active_list);1096tqt->tqt_task = NULL;10971098/* For prealloc'd tasks, we don't free anything. */1099if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC))1100task_done(tq, t);11011102/*1103* When the current lowest outstanding taskqid is1104* done calculate the new lowest outstanding id1105*/1106if (tq->tq_lowest_id == tqt->tqt_id) {1107tq->tq_lowest_id = taskq_lowest_id(tq);1108ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id);1109}11101111/* Spawn additional taskq threads if required. */1112if ((++seq_tasks) > spl_taskq_thread_sequential &&1113taskq_thread_spawn(tq))1114seq_tasks = 0;11151116tqt->tqt_id = TASKQID_INVALID;1117tqt->tqt_flags = 0;1118wake_up_all(&tq->tq_wait_waitq);1119} else1120TQSTAT_INC(tq, thread_wakeups_nowork);11211122set_current_state(TASK_INTERRUPTIBLE);11231124}11251126__set_current_state(TASK_RUNNING);1127tq->tq_nthreads--;1128list_del_init(&tqt->tqt_thread_list);11291130TQSTAT_DEC(tq, threads_total);1131TQSTAT_INC(tq, threads_destroyed);11321133error:1134kmem_free(tqt, sizeof (taskq_thread_t));1135spin_unlock_irqrestore(&tq->tq_lock, flags);11361137tsd_set(taskq_tsd, NULL);1138thread_exit();11391140return (0);1141}11421143static taskq_thread_t *1144taskq_thread_create(taskq_t *tq)1145{1146static int last_used_cpu = 0;1147taskq_thread_t *tqt;11481149tqt = kmem_alloc(sizeof (*tqt), KM_PUSHPAGE);1150INIT_LIST_HEAD(&tqt->tqt_thread_list);1151INIT_LIST_HEAD(&tqt->tqt_active_list);1152tqt->tqt_tq = tq;1153tqt->tqt_id = TASKQID_INVALID;11541155tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt,1156"%s", tq->tq_name);1157if (tqt->tqt_thread == NULL) {1158kmem_free(tqt, sizeof (taskq_thread_t));1159return (NULL);1160}11611162if (spl_taskq_thread_bind) {1163last_used_cpu = (last_used_cpu + 1) % num_online_cpus();1164kthread_bind(tqt->tqt_thread, last_used_cpu);1165}11661167if (spl_taskq_thread_priority)1168set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri));11691170wake_up_process(tqt->tqt_thread);11711172TQSTAT_INC(tq, threads_created);11731174return (tqt);1175}11761177static void1178taskq_stats_init(taskq_t *tq)1179{1180taskq_sums_t *tqs = &tq->tq_sums;1181wmsum_init(&tqs->tqs_threads_active, 0);1182wmsum_init(&tqs->tqs_threads_idle, 0);1183wmsum_init(&tqs->tqs_threads_total, 0);1184wmsum_init(&tqs->tqs_tasks_pending, 0);1185wmsum_init(&tqs->tqs_tasks_priority, 0);1186wmsum_init(&tqs->tqs_tasks_total, 0);1187wmsum_init(&tqs->tqs_tasks_delayed, 0);1188wmsum_init(&tqs->tqs_entries_free, 0);1189wmsum_init(&tqs->tqs_threads_created, 0);1190wmsum_init(&tqs->tqs_threads_destroyed, 0);1191wmsum_init(&tqs->tqs_tasks_dispatched, 0);1192wmsum_init(&tqs->tqs_tasks_dispatched_delayed, 0);1193wmsum_init(&tqs->tqs_tasks_executed_normal, 0);1194wmsum_init(&tqs->tqs_tasks_executed_priority, 0);1195wmsum_init(&tqs->tqs_tasks_executed, 0);1196wmsum_init(&tqs->tqs_tasks_delayed_requeued, 0);1197wmsum_init(&tqs->tqs_tasks_cancelled, 0);1198wmsum_init(&tqs->tqs_thread_wakeups, 0);1199wmsum_init(&tqs->tqs_thread_wakeups_nowork, 0);1200wmsum_init(&tqs->tqs_thread_sleeps, 0);1201}12021203static void1204taskq_stats_fini(taskq_t *tq)1205{1206taskq_sums_t *tqs = &tq->tq_sums;1207wmsum_fini(&tqs->tqs_threads_active);1208wmsum_fini(&tqs->tqs_threads_idle);1209wmsum_fini(&tqs->tqs_threads_total);1210wmsum_fini(&tqs->tqs_tasks_pending);1211wmsum_fini(&tqs->tqs_tasks_priority);1212wmsum_fini(&tqs->tqs_tasks_total);1213wmsum_fini(&tqs->tqs_tasks_delayed);1214wmsum_fini(&tqs->tqs_entries_free);1215wmsum_fini(&tqs->tqs_threads_created);1216wmsum_fini(&tqs->tqs_threads_destroyed);1217wmsum_fini(&tqs->tqs_tasks_dispatched);1218wmsum_fini(&tqs->tqs_tasks_dispatched_delayed);1219wmsum_fini(&tqs->tqs_tasks_executed_normal);1220wmsum_fini(&tqs->tqs_tasks_executed_priority);1221wmsum_fini(&tqs->tqs_tasks_executed);1222wmsum_fini(&tqs->tqs_tasks_delayed_requeued);1223wmsum_fini(&tqs->tqs_tasks_cancelled);1224wmsum_fini(&tqs->tqs_thread_wakeups);1225wmsum_fini(&tqs->tqs_thread_wakeups_nowork);1226wmsum_fini(&tqs->tqs_thread_sleeps);1227}12281229static int1230taskq_kstats_update(kstat_t *ksp, int rw)1231{1232if (rw == KSTAT_WRITE)1233return (EACCES);12341235taskq_t *tq = ksp->ks_private;1236taskq_kstats_t *tqks = ksp->ks_data;12371238tqks->tqks_threads_max.value.ui64 = tq->tq_maxthreads;1239tqks->tqks_entry_pool_min.value.ui64 = tq->tq_minalloc;1240tqks->tqks_entry_pool_max.value.ui64 = tq->tq_maxalloc;12411242taskq_sums_t *tqs = &tq->tq_sums;12431244tqks->tqks_threads_active.value.ui64 =1245wmsum_value(&tqs->tqs_threads_active);1246tqks->tqks_threads_idle.value.ui64 =1247wmsum_value(&tqs->tqs_threads_idle);1248tqks->tqks_threads_total.value.ui64 =1249wmsum_value(&tqs->tqs_threads_total);1250tqks->tqks_tasks_pending.value.ui64 =1251wmsum_value(&tqs->tqs_tasks_pending);1252tqks->tqks_tasks_priority.value.ui64 =1253wmsum_value(&tqs->tqs_tasks_priority);1254tqks->tqks_tasks_total.value.ui64 =1255wmsum_value(&tqs->tqs_tasks_total);1256tqks->tqks_tasks_delayed.value.ui64 =1257wmsum_value(&tqs->tqs_tasks_delayed);1258tqks->tqks_entries_free.value.ui64 =1259wmsum_value(&tqs->tqs_entries_free);1260tqks->tqks_threads_created.value.ui64 =1261wmsum_value(&tqs->tqs_threads_created);1262tqks->tqks_threads_destroyed.value.ui64 =1263wmsum_value(&tqs->tqs_threads_destroyed);1264tqks->tqks_tasks_dispatched.value.ui64 =1265wmsum_value(&tqs->tqs_tasks_dispatched);1266tqks->tqks_tasks_dispatched_delayed.value.ui64 =1267wmsum_value(&tqs->tqs_tasks_dispatched_delayed);1268tqks->tqks_tasks_executed_normal.value.ui64 =1269wmsum_value(&tqs->tqs_tasks_executed_normal);1270tqks->tqks_tasks_executed_priority.value.ui64 =1271wmsum_value(&tqs->tqs_tasks_executed_priority);1272tqks->tqks_tasks_executed.value.ui64 =1273wmsum_value(&tqs->tqs_tasks_executed);1274tqks->tqks_tasks_delayed_requeued.value.ui64 =1275wmsum_value(&tqs->tqs_tasks_delayed_requeued);1276tqks->tqks_tasks_cancelled.value.ui64 =1277wmsum_value(&tqs->tqs_tasks_cancelled);1278tqks->tqks_thread_wakeups.value.ui64 =1279wmsum_value(&tqs->tqs_thread_wakeups);1280tqks->tqks_thread_wakeups_nowork.value.ui64 =1281wmsum_value(&tqs->tqs_thread_wakeups_nowork);1282tqks->tqks_thread_sleeps.value.ui64 =1283wmsum_value(&tqs->tqs_thread_sleeps);12841285return (0);1286}12871288static void1289taskq_kstats_init(taskq_t *tq)1290{1291char name[TASKQ_NAMELEN+5]; /* 5 for dot, 3x instance digits, null */1292snprintf(name, sizeof (name), "%s.%d", tq->tq_name, tq->tq_instance);12931294kstat_t *ksp = kstat_create("taskq", 0, name, "misc",1295KSTAT_TYPE_NAMED, sizeof (taskq_kstats_t) / sizeof (kstat_named_t),1296KSTAT_FLAG_VIRTUAL);12971298if (ksp == NULL)1299return;13001301ksp->ks_private = tq;1302ksp->ks_update = taskq_kstats_update;1303ksp->ks_data = kmem_alloc(sizeof (taskq_kstats_t), KM_SLEEP);1304memcpy(ksp->ks_data, &taskq_kstats_template, sizeof (taskq_kstats_t));1305kstat_install(ksp);13061307tq->tq_ksp = ksp;1308}13091310static void1311taskq_kstats_fini(taskq_t *tq)1312{1313if (tq->tq_ksp == NULL)1314return;13151316kmem_free(tq->tq_ksp->ks_data, sizeof (taskq_kstats_t));1317kstat_delete(tq->tq_ksp);13181319tq->tq_ksp = NULL;1320}13211322taskq_t *1323taskq_create(const char *name, int threads_arg, pri_t pri,1324int minalloc, int maxalloc, uint_t flags)1325{1326taskq_t *tq;1327taskq_thread_t *tqt;1328int count = 0, rc = 0, i;1329unsigned long irqflags;1330int nthreads = threads_arg;13311332ASSERT(name != NULL);1333ASSERT(minalloc >= 0);1334ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */13351336/* Scale the number of threads using nthreads as a percentage */1337if (flags & TASKQ_THREADS_CPU_PCT) {1338ASSERT(nthreads <= 100);1339ASSERT(nthreads >= 0);1340nthreads = MIN(threads_arg, 100);1341nthreads = MAX(nthreads, 0);1342nthreads = MAX((num_online_cpus() * nthreads) /100, 1);1343}13441345tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE);1346if (tq == NULL)1347return (NULL);13481349tq->tq_hp_support = B_FALSE;13501351if (flags & TASKQ_THREADS_CPU_PCT) {1352tq->tq_hp_support = B_TRUE;1353if (cpuhp_state_add_instance_nocalls(spl_taskq_cpuhp_state,1354&tq->tq_hp_cb_node) != 0) {1355kmem_free(tq, sizeof (*tq));1356return (NULL);1357}1358}13591360spin_lock_init(&tq->tq_lock);1361INIT_LIST_HEAD(&tq->tq_thread_list);1362INIT_LIST_HEAD(&tq->tq_active_list);1363tq->tq_name = kmem_strdup(name);1364tq->tq_nactive = 0;1365tq->tq_nthreads = 0;1366tq->tq_nspawn = 0;1367tq->tq_maxthreads = nthreads;1368tq->tq_cpu_pct = threads_arg;1369tq->tq_pri = pri;1370tq->tq_minalloc = minalloc;1371tq->tq_maxalloc = maxalloc;1372tq->tq_nalloc = 0;1373tq->tq_flags = (flags | TASKQ_ACTIVE);1374tq->tq_next_id = TASKQID_INITIAL;1375tq->tq_lowest_id = TASKQID_INITIAL;1376tq->lastspawnstop = jiffies;1377INIT_LIST_HEAD(&tq->tq_free_list);1378INIT_LIST_HEAD(&tq->tq_pend_list);1379INIT_LIST_HEAD(&tq->tq_prio_list);1380INIT_LIST_HEAD(&tq->tq_delay_list);1381init_waitqueue_head(&tq->tq_work_waitq);1382init_waitqueue_head(&tq->tq_wait_waitq);1383tq->tq_lock_class = TQ_LOCK_GENERAL;1384INIT_LIST_HEAD(&tq->tq_taskqs);1385taskq_stats_init(tq);13861387if (flags & TASKQ_PREPOPULATE) {1388spin_lock_irqsave_nested(&tq->tq_lock, irqflags,1389tq->tq_lock_class);13901391for (i = 0; i < minalloc; i++)1392task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW,1393&irqflags));13941395spin_unlock_irqrestore(&tq->tq_lock, irqflags);1396}13971398if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic)1399nthreads = 1;14001401for (i = 0; i < nthreads; i++) {1402tqt = taskq_thread_create(tq);1403if (tqt == NULL)1404rc = 1;1405else1406count++;1407}14081409/* Wait for all threads to be started before potential destroy */1410wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count);1411/*1412* taskq_thread might have touched nspawn, but we don't want them to1413* because they're not dynamically spawned. So we reset it to 01414*/1415tq->tq_nspawn = 0;14161417if (rc) {1418taskq_destroy(tq);1419return (NULL);1420}14211422down_write(&tq_list_sem);1423tq->tq_instance = taskq_find_by_name(name) + 1;1424list_add_tail(&tq->tq_taskqs, &tq_list);1425up_write(&tq_list_sem);14261427/* Install kstats late, because the name includes tq_instance */1428taskq_kstats_init(tq);14291430return (tq);1431}1432EXPORT_SYMBOL(taskq_create);14331434void1435taskq_destroy(taskq_t *tq)1436{1437struct task_struct *thread;1438taskq_thread_t *tqt;1439taskq_ent_t *t;1440unsigned long flags;14411442ASSERT(tq);1443spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);1444tq->tq_flags &= ~TASKQ_ACTIVE;1445spin_unlock_irqrestore(&tq->tq_lock, flags);14461447if (tq->tq_hp_support) {1448VERIFY0(cpuhp_state_remove_instance_nocalls(1449spl_taskq_cpuhp_state, &tq->tq_hp_cb_node));1450}14511452/*1453* When TASKQ_ACTIVE is clear new tasks may not be added nor may1454* new worker threads be spawned for dynamic taskq.1455*/1456if (dynamic_taskq != NULL)1457taskq_wait_outstanding(dynamic_taskq, 0);14581459taskq_wait(tq);14601461taskq_kstats_fini(tq);14621463/* remove taskq from global list used by the kstats */1464down_write(&tq_list_sem);1465list_del(&tq->tq_taskqs);1466up_write(&tq_list_sem);14671468spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);1469/* wait for spawning threads to insert themselves to the list */1470while (tq->tq_nspawn) {1471spin_unlock_irqrestore(&tq->tq_lock, flags);1472schedule_timeout_interruptible(1);1473spin_lock_irqsave_nested(&tq->tq_lock, flags,1474tq->tq_lock_class);1475}14761477/*1478* Signal each thread to exit and block until it does. Each thread1479* is responsible for removing itself from the list and freeing its1480* taskq_thread_t. This allows for idle threads to opt to remove1481* themselves from the taskq. They can be recreated as needed.1482*/1483while (!list_empty(&tq->tq_thread_list)) {1484tqt = list_entry(tq->tq_thread_list.next,1485taskq_thread_t, tqt_thread_list);1486thread = tqt->tqt_thread;1487spin_unlock_irqrestore(&tq->tq_lock, flags);14881489kthread_stop(thread);14901491spin_lock_irqsave_nested(&tq->tq_lock, flags,1492tq->tq_lock_class);1493}14941495while (!list_empty(&tq->tq_free_list)) {1496t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);14971498ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));14991500list_del_init(&t->tqent_list);1501task_free(tq, t);1502}15031504ASSERT0(tq->tq_nthreads);1505ASSERT0(tq->tq_nalloc);1506ASSERT0(tq->tq_nspawn);1507ASSERT(list_empty(&tq->tq_thread_list));1508ASSERT(list_empty(&tq->tq_active_list));1509ASSERT(list_empty(&tq->tq_free_list));1510ASSERT(list_empty(&tq->tq_pend_list));1511ASSERT(list_empty(&tq->tq_prio_list));1512ASSERT(list_empty(&tq->tq_delay_list));15131514spin_unlock_irqrestore(&tq->tq_lock, flags);15151516taskq_stats_fini(tq);1517kmem_strfree(tq->tq_name);1518kmem_free(tq, sizeof (taskq_t));1519}1520EXPORT_SYMBOL(taskq_destroy);15211522/*1523* Create a taskq with a specified number of pool threads. Allocate1524* and return an array of nthreads kthread_t pointers, one for each1525* thread in the pool. The array is not ordered and must be freed1526* by the caller.1527*/1528taskq_t *1529taskq_create_synced(const char *name, int nthreads, pri_t pri,1530int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)1531{1532taskq_t *tq;1533taskq_thread_t *tqt;1534int i = 0;1535kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,1536KM_SLEEP);15371538flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);15391540/* taskq_create spawns all the threads before returning */1541tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,1542flags | TASKQ_PREPOPULATE);1543VERIFY(tq != NULL);1544VERIFY(tq->tq_nthreads == nthreads);15451546list_for_each_entry(tqt, &tq->tq_thread_list, tqt_thread_list) {1547kthreads[i] = tqt->tqt_thread;1548i++;1549}15501551ASSERT3S(i, ==, nthreads);1552*ktpp = kthreads;15531554return (tq);1555}1556EXPORT_SYMBOL(taskq_create_synced);15571558static kstat_t *taskq_summary_ksp = NULL;15591560static int1561spl_taskq_kstat_headers(char *buf, size_t size)1562{1563size_t n = snprintf(buf, size,1564"%-20s | %-17s | %-23s\n"1565"%-20s | %-17s | %-23s\n"1566"%-20s | %-17s | %-23s\n",1567"", "threads", "tasks on queue",1568"taskq name", "tot [act idl] max", " pend [ norm high] dly",1569"--------------------", "-----------------",1570"-----------------------");1571return (n >= size ? ENOMEM : 0);1572}15731574static int1575spl_taskq_kstat_data(char *buf, size_t size, void *data)1576{1577struct list_head *tql = NULL;1578taskq_t *tq;1579char name[TASKQ_NAMELEN+5]; /* 5 for dot, 3x instance digits, null */1580char threads[25];1581char tasks[30];1582size_t n;1583int err = 0;15841585down_read(&tq_list_sem);1586list_for_each_prev(tql, &tq_list) {1587tq = list_entry(tql, taskq_t, tq_taskqs);15881589mutex_enter(tq->tq_ksp->ks_lock);1590taskq_kstats_update(tq->tq_ksp, KSTAT_READ);1591taskq_kstats_t *tqks = tq->tq_ksp->ks_data;15921593snprintf(name, sizeof (name), "%s.%d", tq->tq_name,1594tq->tq_instance);1595snprintf(threads, sizeof (threads), "%3llu [%3llu %3llu] %3llu",1596tqks->tqks_threads_total.value.ui64,1597tqks->tqks_threads_active.value.ui64,1598tqks->tqks_threads_idle.value.ui64,1599tqks->tqks_threads_max.value.ui64);1600snprintf(tasks, sizeof (tasks), "%5llu [%5llu %5llu] %3llu",1601tqks->tqks_tasks_total.value.ui64,1602tqks->tqks_tasks_pending.value.ui64,1603tqks->tqks_tasks_priority.value.ui64,1604tqks->tqks_tasks_delayed.value.ui64);16051606mutex_exit(tq->tq_ksp->ks_lock);16071608n = snprintf(buf, size, "%-20s | %-17s | %-23s\n",1609name, threads, tasks);1610if (n >= size) {1611err = ENOMEM;1612break;1613}16141615buf = &buf[n];1616size -= n;1617}16181619up_read(&tq_list_sem);16201621return (err);1622}16231624static void1625spl_taskq_kstat_init(void)1626{1627kstat_t *ksp = kstat_create("taskq", 0, "summary", "misc",1628KSTAT_TYPE_RAW, 0, KSTAT_FLAG_VIRTUAL);16291630if (ksp == NULL)1631return;16321633ksp->ks_data = (void *)(uintptr_t)1;1634ksp->ks_ndata = 1;1635kstat_set_raw_ops(ksp, spl_taskq_kstat_headers,1636spl_taskq_kstat_data, NULL);1637kstat_install(ksp);16381639taskq_summary_ksp = ksp;1640}16411642static void1643spl_taskq_kstat_fini(void)1644{1645if (taskq_summary_ksp == NULL)1646return;16471648kstat_delete(taskq_summary_ksp);1649taskq_summary_ksp = NULL;1650}16511652static unsigned int spl_taskq_kick = 0;16531654static int1655param_set_taskq_kick(const char *val, zfs_kernel_param_t *kp)1656{1657int ret;1658taskq_t *tq = NULL;1659taskq_ent_t *t;1660unsigned long flags;16611662ret = param_set_uint(val, kp);1663if (ret < 0 || !spl_taskq_kick)1664return (ret);1665/* reset value */1666spl_taskq_kick = 0;16671668down_read(&tq_list_sem);1669list_for_each_entry(tq, &tq_list, tq_taskqs) {1670spin_lock_irqsave_nested(&tq->tq_lock, flags,1671tq->tq_lock_class);1672/* Check if the first pending is older than 5 seconds */1673t = taskq_next_ent(tq);1674if (t && time_after(jiffies, t->tqent_birth + 5*HZ)) {1675(void) taskq_thread_spawn(tq);1676printk(KERN_INFO "spl: Kicked taskq %s/%d\n",1677tq->tq_name, tq->tq_instance);1678}1679spin_unlock_irqrestore(&tq->tq_lock, flags);1680}1681up_read(&tq_list_sem);1682return (ret);1683}16841685module_param_call(spl_taskq_kick, param_set_taskq_kick, param_get_uint,1686&spl_taskq_kick, 0644);1687MODULE_PARM_DESC(spl_taskq_kick,1688"Write nonzero to kick stuck taskqs to spawn more threads");16891690/*1691* This callback will be called exactly once for each core that comes online,1692* for each dynamic taskq. We attempt to expand taskqs that have1693* TASKQ_THREADS_CPU_PCT set. We need to redo the percentage calculation every1694* time, to correctly determine whether or not to add a thread.1695*/1696static int1697spl_taskq_expand(unsigned int cpu, struct hlist_node *node)1698{1699taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);1700unsigned long flags;1701int err = 0;17021703ASSERT(tq);1704spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);17051706if (!(tq->tq_flags & TASKQ_ACTIVE)) {1707spin_unlock_irqrestore(&tq->tq_lock, flags);1708return (err);1709}17101711ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);1712int nthreads = MIN(tq->tq_cpu_pct, 100);1713nthreads = MAX(((num_online_cpus() + 1) * nthreads) / 100, 1);1714tq->tq_maxthreads = nthreads;17151716if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&1717tq->tq_maxthreads > tq->tq_nthreads) {1718spin_unlock_irqrestore(&tq->tq_lock, flags);1719taskq_thread_t *tqt = taskq_thread_create(tq);1720if (tqt == NULL)1721err = -1;1722return (err);1723}1724spin_unlock_irqrestore(&tq->tq_lock, flags);1725return (err);1726}17271728/*1729* While we don't support offlining CPUs, it is possible that CPUs will fail1730* to online successfully. We do need to be able to handle this case1731* gracefully.1732*/1733static int1734spl_taskq_prepare_down(unsigned int cpu, struct hlist_node *node)1735{1736taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);1737unsigned long flags;17381739ASSERT(tq);1740spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);17411742if (!(tq->tq_flags & TASKQ_ACTIVE))1743goto out;17441745ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);1746int nthreads = MIN(tq->tq_cpu_pct, 100);1747nthreads = MAX(((num_online_cpus()) * nthreads) / 100, 1);1748tq->tq_maxthreads = nthreads;17491750if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&1751tq->tq_maxthreads < tq->tq_nthreads) {1752ASSERT3U(tq->tq_maxthreads, ==, tq->tq_nthreads - 1);1753taskq_thread_t *tqt = list_entry(tq->tq_thread_list.next,1754taskq_thread_t, tqt_thread_list);1755struct task_struct *thread = tqt->tqt_thread;1756spin_unlock_irqrestore(&tq->tq_lock, flags);17571758kthread_stop(thread);17591760return (0);1761}17621763out:1764spin_unlock_irqrestore(&tq->tq_lock, flags);1765return (0);1766}17671768int1769spl_taskq_init(void)1770{1771init_rwsem(&tq_list_sem);1772tsd_create(&taskq_tsd, NULL);17731774spl_taskq_cpuhp_state = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN,1775"fs/spl_taskq:online", spl_taskq_expand, spl_taskq_prepare_down);17761777system_taskq = taskq_create("spl_system_taskq", MAX(boot_ncpus, 64),1778maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);1779if (system_taskq == NULL)1780return (-ENOMEM);17811782system_delay_taskq = taskq_create("spl_delay_taskq", MAX(boot_ncpus, 4),1783maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);1784if (system_delay_taskq == NULL) {1785cpuhp_remove_multi_state(spl_taskq_cpuhp_state);1786taskq_destroy(system_taskq);1787return (-ENOMEM);1788}17891790dynamic_taskq = taskq_create("spl_dynamic_taskq", 1,1791maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE);1792if (dynamic_taskq == NULL) {1793cpuhp_remove_multi_state(spl_taskq_cpuhp_state);1794taskq_destroy(system_taskq);1795taskq_destroy(system_delay_taskq);1796return (-ENOMEM);1797}17981799/*1800* This is used to annotate tq_lock, so1801* taskq_dispatch -> taskq_thread_spawn -> taskq_dispatch1802* does not trigger a lockdep warning re: possible recursive locking1803*/1804dynamic_taskq->tq_lock_class = TQ_LOCK_DYNAMIC;18051806spl_taskq_kstat_init();18071808return (0);1809}18101811void1812spl_taskq_fini(void)1813{1814spl_taskq_kstat_fini();18151816taskq_destroy(dynamic_taskq);1817dynamic_taskq = NULL;18181819taskq_destroy(system_delay_taskq);1820system_delay_taskq = NULL;18211822taskq_destroy(system_taskq);1823system_taskq = NULL;18241825tsd_destroy(&taskq_tsd);18261827cpuhp_remove_multi_state(spl_taskq_cpuhp_state);1828spl_taskq_cpuhp_state = 0;1829}183018311832