Path: blob/main/sys/contrib/openzfs/module/os/linux/spl/spl-taskq.c
105579 views
// SPDX-License-Identifier: GPL-2.0-or-later1/*2* Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC.3* Copyright (C) 2007 The Regents of the University of California.4* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).5* Written by Brian Behlendorf <[email protected]>.6* UCRL-CODE-2351977*8* This file is part of the SPL, Solaris Porting Layer.9*10* The SPL is free software; you can redistribute it and/or modify it11* under the terms of the GNU General Public License as published by the12* Free Software Foundation; either version 2 of the License, or (at your13* option) any later version.14*15* The SPL is distributed in the hope that it will be useful, but WITHOUT16* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or17* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License18* for more details.19*20* You should have received a copy of the GNU General Public License along21* with the SPL. If not, see <http://www.gnu.org/licenses/>.22*23* Solaris Porting Layer (SPL) Task Queue Implementation.24*/25/*26* Copyright (c) 2024, Klara Inc.27* Copyright (c) 2024, Syneto28*/2930#include <sys/timer.h>31#include <sys/taskq.h>32#include <sys/kmem.h>33#include <sys/tsd.h>34#include <sys/time.h>35#include <sys/atomic.h>36#include <sys/kstat.h>37#include <linux/cpuhotplug.h>38#include <linux/mod_compat.h>3940/* Linux 6.2 renamed timer_delete_sync(); point it at its old name for those. */41#ifndef HAVE_TIMER_DELETE_SYNC42#define timer_delete_sync(t) del_timer_sync(t)43#endif4445typedef struct taskq_kstats {46/* static values, for completeness */47kstat_named_t tqks_threads_max;48kstat_named_t tqks_entry_pool_min;49kstat_named_t tqks_entry_pool_max;5051/* gauges (inc/dec counters, current value) */52kstat_named_t tqks_threads_active;53kstat_named_t tqks_threads_idle;54kstat_named_t tqks_threads_total;55kstat_named_t tqks_tasks_pending;56kstat_named_t tqks_tasks_priority;57kstat_named_t tqks_tasks_total;58kstat_named_t tqks_tasks_delayed;59kstat_named_t tqks_entries_free;6061/* counters (inc only, since taskq creation) */62kstat_named_t tqks_threads_created;63kstat_named_t tqks_threads_destroyed;64kstat_named_t tqks_tasks_dispatched;65kstat_named_t tqks_tasks_dispatched_delayed;66kstat_named_t tqks_tasks_executed_normal;67kstat_named_t tqks_tasks_executed_priority;68kstat_named_t tqks_tasks_executed;69kstat_named_t tqks_tasks_delayed_requeued;70kstat_named_t tqks_tasks_cancelled;71kstat_named_t tqks_thread_wakeups;72kstat_named_t tqks_thread_wakeups_nowork;73kstat_named_t tqks_thread_sleeps;74} taskq_kstats_t;7576static taskq_kstats_t taskq_kstats_template = {77{ "threads_max", KSTAT_DATA_UINT64 },78{ "entry_pool_min", KSTAT_DATA_UINT64 },79{ "entry_pool_max", KSTAT_DATA_UINT64 },80{ "threads_active", KSTAT_DATA_UINT64 },81{ "threads_idle", KSTAT_DATA_UINT64 },82{ "threads_total", KSTAT_DATA_UINT64 },83{ "tasks_pending", KSTAT_DATA_UINT64 },84{ "tasks_priority", KSTAT_DATA_UINT64 },85{ "tasks_total", KSTAT_DATA_UINT64 },86{ "tasks_delayed", KSTAT_DATA_UINT64 },87{ "entries_free", KSTAT_DATA_UINT64 },8889{ "threads_created", KSTAT_DATA_UINT64 },90{ "threads_destroyed", KSTAT_DATA_UINT64 },91{ "tasks_dispatched", KSTAT_DATA_UINT64 },92{ "tasks_dispatched_delayed", KSTAT_DATA_UINT64 },93{ "tasks_executed_normal", KSTAT_DATA_UINT64 },94{ "tasks_executed_priority", KSTAT_DATA_UINT64 },95{ "tasks_executed", KSTAT_DATA_UINT64 },96{ "tasks_delayed_requeued", KSTAT_DATA_UINT64 },97{ "tasks_cancelled", KSTAT_DATA_UINT64 },98{ "thread_wakeups", KSTAT_DATA_UINT64 },99{ "thread_wakeups_nowork", KSTAT_DATA_UINT64 },100{ "thread_sleeps", KSTAT_DATA_UINT64 },101};102103#define TQSTAT_INC(tq, stat) wmsum_add(&tq->tq_sums.tqs_##stat, 1)104#define TQSTAT_DEC(tq, stat) wmsum_add(&tq->tq_sums.tqs_##stat, -1)105106#define _TQSTAT_MOD_LIST(mod, tq, t) do { \107switch (t->tqent_flags & TQENT_LIST_MASK) { \108case TQENT_LIST_NONE: ASSERT(list_empty(&t->tqent_list)); break;\109case TQENT_LIST_PENDING: mod(tq, tasks_pending); break; \110case TQENT_LIST_PRIORITY: mod(tq, tasks_priority); break; \111case TQENT_LIST_DELAY: mod(tq, tasks_delayed); break; \112} \113} while (0)114#define TQSTAT_INC_LIST(tq, t) _TQSTAT_MOD_LIST(TQSTAT_INC, tq, t)115#define TQSTAT_DEC_LIST(tq, t) _TQSTAT_MOD_LIST(TQSTAT_DEC, tq, t)116117#define TQENT_SET_LIST(t, l) \118t->tqent_flags = (t->tqent_flags & ~TQENT_LIST_MASK) | l;119120static int spl_taskq_thread_bind = 0;121module_param(spl_taskq_thread_bind, int, 0644);122MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default");123124static uint_t spl_taskq_thread_timeout_ms = 5000;125module_param(spl_taskq_thread_timeout_ms, uint, 0644);126MODULE_PARM_DESC(spl_taskq_thread_timeout_ms,127"Minimum idle threads exit interval for dynamic taskqs");128129static int spl_taskq_thread_dynamic = 1;130module_param(spl_taskq_thread_dynamic, int, 0444);131MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads");132133static int spl_taskq_thread_priority = 1;134module_param(spl_taskq_thread_priority, int, 0644);135MODULE_PARM_DESC(spl_taskq_thread_priority,136"Allow non-default priority for taskq threads");137138static uint_t spl_taskq_thread_sequential = 4;139module_param(spl_taskq_thread_sequential, uint, 0644);140MODULE_PARM_DESC(spl_taskq_thread_sequential,141"Create new taskq threads after N sequential tasks");142143/*144* Global system-wide dynamic task queue available for all consumers. This145* taskq is not intended for long-running tasks; instead, a dedicated taskq146* should be created.147*/148taskq_t *system_taskq;149EXPORT_SYMBOL(system_taskq);150/* Global dynamic task queue for long delay */151taskq_t *system_delay_taskq;152EXPORT_SYMBOL(system_delay_taskq);153154/* Private dedicated taskq for creating new taskq threads on demand. */155static taskq_t *dynamic_taskq;156static taskq_thread_t *taskq_thread_create(taskq_t *);157158/* Multi-callback id for cpu hotplugging. */159static int spl_taskq_cpuhp_state;160161/* List of all taskqs */162LIST_HEAD(tq_list);163struct rw_semaphore tq_list_sem;164static uint_t taskq_tsd;165166static int167task_km_flags(uint_t flags)168{169if (flags & TQ_NOSLEEP)170return (KM_NOSLEEP);171172if (flags & TQ_PUSHPAGE)173return (KM_PUSHPAGE);174175return (KM_SLEEP);176}177178/*179* taskq_find_by_name - Find the largest instance number of a named taskq.180*/181static int182taskq_find_by_name(const char *name)183{184struct list_head *tql = NULL;185taskq_t *tq;186187list_for_each_prev(tql, &tq_list) {188tq = list_entry(tql, taskq_t, tq_taskqs);189if (strcmp(name, tq->tq_name) == 0)190return (tq->tq_instance);191}192return (-1);193}194195/*196* NOTE: Must be called with tq->tq_lock held, returns a list_t which197* is not attached to the free, work, or pending taskq lists.198*/199static taskq_ent_t *200task_alloc(taskq_t *tq, uint_t flags, unsigned long *irqflags)201{202taskq_ent_t *t;203int count = 0;204205ASSERT(tq);206retry:207/* Acquire taskq_ent_t's from free list if available */208if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) {209t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);210211ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));212ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL));213ASSERT(!timer_pending(&t->tqent_timer));214215list_del_init(&t->tqent_list);216TQSTAT_DEC(tq, entries_free);217return (t);218}219220/* Free list is empty and memory allocations are prohibited */221if (flags & TQ_NOALLOC)222return (NULL);223224/* Hit maximum taskq_ent_t pool size */225if (tq->tq_nalloc >= tq->tq_maxalloc) {226if (flags & TQ_NOSLEEP)227return (NULL);228229/*230* Sleep periodically polling the free list for an available231* taskq_ent_t. Dispatching with TQ_SLEEP should always succeed232* but we cannot block forever waiting for an taskq_ent_t to233* show up in the free list, otherwise a deadlock can happen.234*235* Therefore, we need to allocate a new task even if the number236* of allocated tasks is above tq->tq_maxalloc, but we still237* end up delaying the task allocation by one second, thereby238* throttling the task dispatch rate.239*/240spin_unlock_irqrestore(&tq->tq_lock, *irqflags);241schedule_timeout_interruptible(HZ / 100);242spin_lock_irqsave_nested(&tq->tq_lock, *irqflags,243tq->tq_lock_class);244if (count < 100) {245count++;246goto retry;247}248}249250spin_unlock_irqrestore(&tq->tq_lock, *irqflags);251t = kmem_alloc(sizeof (taskq_ent_t), task_km_flags(flags));252spin_lock_irqsave_nested(&tq->tq_lock, *irqflags, tq->tq_lock_class);253254if (t) {255taskq_init_ent(t);256tq->tq_nalloc++;257}258259return (t);260}261262/*263* NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t264* to already be removed from the free, work, or pending taskq lists.265*/266static void267task_free(taskq_t *tq, taskq_ent_t *t)268{269ASSERT(tq);270ASSERT(t);271ASSERT(list_empty(&t->tqent_list));272ASSERT(!timer_pending(&t->tqent_timer));273274kmem_free(t, sizeof (taskq_ent_t));275tq->tq_nalloc--;276}277278/*279* NOTE: Must be called with tq->tq_lock held, either destroys the280* taskq_ent_t if too many exist or moves it to the free list for later use.281*/282static void283task_done(taskq_t *tq, taskq_ent_t *t)284{285ASSERT(tq);286ASSERT(t);287ASSERT(list_empty(&t->tqent_list));288289/* Wake tasks blocked in taskq_wait_id() */290wake_up_all(&t->tqent_waitq);291292if (tq->tq_nalloc <= tq->tq_minalloc) {293t->tqent_id = TASKQID_INVALID;294t->tqent_func = NULL;295t->tqent_arg = NULL;296t->tqent_flags = 0;297298list_add_tail(&t->tqent_list, &tq->tq_free_list);299TQSTAT_INC(tq, entries_free);300} else {301task_free(tq, t);302}303}304305/*306* When a delayed task timer expires remove it from the delay list and307* add it to the priority list in order for immediate processing.308*/309static void310task_expire_impl(taskq_ent_t *t)311{312taskq_ent_t *w;313taskq_t *tq = t->tqent_taskq;314struct list_head *l = NULL;315unsigned long flags;316317spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);318319if (t->tqent_flags & TQENT_FLAG_CANCEL) {320ASSERT(list_empty(&t->tqent_list));321spin_unlock_irqrestore(&tq->tq_lock, flags);322return;323}324325t->tqent_birth = jiffies;326327/*328* The priority list must be maintained in strict task id order329* from lowest to highest for lowest_id to be easily calculable.330*/331list_del(&t->tqent_list);332list_for_each_prev(l, &tq->tq_prio_list) {333w = list_entry(l, taskq_ent_t, tqent_list);334if (w->tqent_id < t->tqent_id) {335list_add(&t->tqent_list, l);336break;337}338}339if (l == &tq->tq_prio_list)340list_add(&t->tqent_list, &tq->tq_prio_list);341342spin_unlock_irqrestore(&tq->tq_lock, flags);343344wake_up(&tq->tq_work_waitq);345346TQSTAT_INC(tq, tasks_delayed_requeued);347}348349static void350task_expire(struct timer_list *tl)351{352struct timer_list *tmr = (struct timer_list *)tl;353taskq_ent_t *t = from_timer(t, tmr, tqent_timer);354task_expire_impl(t);355}356357/*358* Returns the lowest incomplete taskqid_t. The taskqid_t may359* be queued on the pending list, on the priority list, on the360* delay list, or on the work list currently being handled, but361* it is not 100% complete yet.362*/363static taskqid_t364taskq_lowest_id(taskq_t *tq)365{366taskqid_t lowest_id = tq->tq_next_id;367taskq_ent_t *t;368taskq_thread_t *tqt;369370if (!list_empty(&tq->tq_pend_list)) {371t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list);372lowest_id = MIN(lowest_id, t->tqent_id);373}374375if (!list_empty(&tq->tq_prio_list)) {376t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list);377lowest_id = MIN(lowest_id, t->tqent_id);378}379380if (!list_empty(&tq->tq_delay_list)) {381t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list);382lowest_id = MIN(lowest_id, t->tqent_id);383}384385if (!list_empty(&tq->tq_active_list)) {386tqt = list_entry(tq->tq_active_list.next, taskq_thread_t,387tqt_active_list);388ASSERT(tqt->tqt_id != TASKQID_INVALID);389lowest_id = MIN(lowest_id, tqt->tqt_id);390}391392return (lowest_id);393}394395/*396* Insert a task into a list keeping the list sorted by increasing taskqid.397*/398static void399taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt)400{401taskq_thread_t *w;402struct list_head *l = NULL;403404ASSERT(tq);405ASSERT(tqt);406407list_for_each_prev(l, &tq->tq_active_list) {408w = list_entry(l, taskq_thread_t, tqt_active_list);409if (w->tqt_id < tqt->tqt_id) {410list_add(&tqt->tqt_active_list, l);411break;412}413}414if (l == &tq->tq_active_list)415list_add(&tqt->tqt_active_list, &tq->tq_active_list);416}417418/*419* Find and return a task from the given list if it exists. The list420* must be in lowest to highest task id order.421*/422static taskq_ent_t *423taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id)424{425struct list_head *l = NULL;426taskq_ent_t *t;427428list_for_each(l, lh) {429t = list_entry(l, taskq_ent_t, tqent_list);430431if (t->tqent_id == id)432return (t);433434if (t->tqent_id > id)435break;436}437438return (NULL);439}440441/*442* Find an already dispatched task given the task id regardless of what443* state it is in. If a task is still pending it will be returned.444* If a task is executing, then -EBUSY will be returned instead.445* If the task has already been run then NULL is returned.446*/447static taskq_ent_t *448taskq_find(taskq_t *tq, taskqid_t id)449{450taskq_thread_t *tqt;451struct list_head *l = NULL;452taskq_ent_t *t;453454t = taskq_find_list(tq, &tq->tq_delay_list, id);455if (t)456return (t);457458t = taskq_find_list(tq, &tq->tq_prio_list, id);459if (t)460return (t);461462t = taskq_find_list(tq, &tq->tq_pend_list, id);463if (t)464return (t);465466list_for_each(l, &tq->tq_active_list) {467tqt = list_entry(l, taskq_thread_t, tqt_active_list);468if (tqt->tqt_id == id) {469/*470* Instead of returning tqt_task, we just return a non471* NULL value to prevent misuse, since tqt_task only472* has two valid fields.473*/474return (ERR_PTR(-EBUSY));475}476}477478return (NULL);479}480481/*482* Theory for the taskq_wait_id(), taskq_wait_outstanding(), and483* taskq_wait() functions below.484*485* Taskq waiting is accomplished by tracking the lowest outstanding task486* id and the next available task id. As tasks are dispatched they are487* added to the tail of the pending, priority, or delay lists. As worker488* threads become available the tasks are removed from the heads of these489* lists and linked to the worker threads. This ensures the lists are490* kept sorted by lowest to highest task id.491*492* Therefore the lowest outstanding task id can be quickly determined by493* checking the head item from all of these lists. This value is stored494* with the taskq as the lowest id. It only needs to be recalculated when495* either the task with the current lowest id completes or is canceled.496*497* By blocking until the lowest task id exceeds the passed task id the498* taskq_wait_outstanding() function can be easily implemented. Similarly,499* by blocking until the lowest task id matches the next task id taskq_wait()500* can be implemented.501*502* Callers should be aware that when there are multiple worked threads it503* is possible for larger task ids to complete before smaller ones. Also504* when the taskq contains delay tasks with small task ids callers may505* block for a considerable length of time waiting for them to expire and506* execute.507*/508static int509taskq_wait_id_check(taskq_t *tq, taskqid_t id)510{511int rc;512unsigned long flags;513514spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);515rc = (taskq_find(tq, id) == NULL);516spin_unlock_irqrestore(&tq->tq_lock, flags);517518return (rc);519}520521/*522* The taskq_wait_id() function blocks until the passed task id completes.523* This does not guarantee that all lower task ids have completed.524*/525void526taskq_wait_id(taskq_t *tq, taskqid_t id)527{528wait_event(tq->tq_wait_waitq, taskq_wait_id_check(tq, id));529}530EXPORT_SYMBOL(taskq_wait_id);531532static int533taskq_wait_outstanding_check(taskq_t *tq, taskqid_t id)534{535int rc;536unsigned long flags;537538spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);539rc = (id < tq->tq_lowest_id);540spin_unlock_irqrestore(&tq->tq_lock, flags);541542return (rc);543}544545/*546* The taskq_wait_outstanding() function will block until all tasks with a547* lower taskqid than the passed 'id' have been completed. Note that all548* task id's are assigned monotonically at dispatch time. Zero may be549* passed for the id to indicate all tasks dispatch up to this point,550* but not after, should be waited for.551*/552void553taskq_wait_outstanding(taskq_t *tq, taskqid_t id)554{555id = id ? id : tq->tq_next_id - 1;556wait_event(tq->tq_wait_waitq, taskq_wait_outstanding_check(tq, id));557}558EXPORT_SYMBOL(taskq_wait_outstanding);559560static int561taskq_wait_check(taskq_t *tq)562{563int rc;564unsigned long flags;565566spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);567rc = (tq->tq_lowest_id == tq->tq_next_id);568spin_unlock_irqrestore(&tq->tq_lock, flags);569570return (rc);571}572573/*574* The taskq_wait() function will block until the taskq is empty.575* This means that if a taskq re-dispatches work to itself taskq_wait()576* callers will block indefinitely.577*/578void579taskq_wait(taskq_t *tq)580{581wait_event(tq->tq_wait_waitq, taskq_wait_check(tq));582}583EXPORT_SYMBOL(taskq_wait);584585int586taskq_member(taskq_t *tq, kthread_t *t)587{588return (tq == (taskq_t *)tsd_get_by_thread(taskq_tsd, t));589}590EXPORT_SYMBOL(taskq_member);591592taskq_t *593taskq_of_curthread(void)594{595return (tsd_get(taskq_tsd));596}597EXPORT_SYMBOL(taskq_of_curthread);598599/*600* Cancel a dispatched task. Pending tasks are cancelled immediately.601* If the task is running, behavior depends on wait parameter:602* - wait=B_TRUE: Block until task completes603* - wait=B_FALSE: Return EBUSY immediately604*605* Return values:606* 0 - Cancelled before execution. Caller must release resources.607* EBUSY - Task running (wait=B_FALSE only). Will self-cleanup.608* ENOENT - Not found, or completed after waiting. Already cleaned up.609*610* Note: wait=B_TRUE returns ENOENT (not EBUSY) after waiting because611* the task no longer exists. This distinguishes "cancelled before run"612* from "completed naturally" for proper resource management.613*/614int615taskq_cancel_id(taskq_t *tq, taskqid_t id, boolean_t wait)616{617taskq_ent_t *t;618int rc = ENOENT;619unsigned long flags;620621ASSERT(tq);622623spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);624t = taskq_find(tq, id);625if (t && t != ERR_PTR(-EBUSY)) {626list_del_init(&t->tqent_list);627TQSTAT_DEC_LIST(tq, t);628TQSTAT_DEC(tq, tasks_total);629630t->tqent_flags |= TQENT_FLAG_CANCEL;631TQSTAT_INC(tq, tasks_cancelled);632633/*634* When canceling the lowest outstanding task id we635* must recalculate the new lowest outstanding id.636*/637if (tq->tq_lowest_id == t->tqent_id) {638tq->tq_lowest_id = taskq_lowest_id(tq);639ASSERT3S(tq->tq_lowest_id, >, t->tqent_id);640}641642/*643* The task_expire() function takes the tq->tq_lock so drop644* the lock before synchronously cancelling the timer.645*646* Always call timer_delete_sync() unconditionally. A647* timer_pending() check would be insufficient and unsafe.648* When a timer expires, it is immediately dequeued from the649* timer wheel (timer_pending() returns FALSE), but the650* callback (task_expire) may not run until later.651*652* The race window:653* 1) Timer expires and is dequeued - timer_pending() now654* returns FALSE655* 2) task_done() is called below, freeing the task, sets656* tqent_func = NULL and clears flags including CANCEL657* 3) Timer callback finally runs, sees no CANCEL flag,658* queues task to prio_list659* 4) Worker thread attempts to execute NULL tqent_func660* and panics661*662* timer_delete_sync() prevents this by ensuring the timer663* callback completes before the task is freed.664*/665spin_unlock_irqrestore(&tq->tq_lock, flags);666timer_delete_sync(&t->tqent_timer);667spin_lock_irqsave_nested(&tq->tq_lock, flags,668tq->tq_lock_class);669670if (!(t->tqent_flags & TQENT_FLAG_PREALLOC))671task_done(tq, t);672673rc = 0;674}675spin_unlock_irqrestore(&tq->tq_lock, flags);676677if (t == ERR_PTR(-EBUSY)) {678if (wait) {679taskq_wait_id(tq, id);680rc = ENOENT; /* Completed, no longer exists */681} else {682rc = EBUSY; /* Still running */683}684}685686return (rc);687}688EXPORT_SYMBOL(taskq_cancel_id);689690static int taskq_thread_spawn(taskq_t *tq);691692taskqid_t693taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)694{695taskq_ent_t *t;696taskqid_t rc = TASKQID_INVALID;697unsigned long irqflags;698699ASSERT(tq);700ASSERT(func);701702spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);703704/* Taskq being destroyed and all tasks drained */705if (!(tq->tq_flags & TASKQ_ACTIVE))706goto out;707708/* Do not queue the task unless there is idle thread for it */709ASSERT(tq->tq_nactive <= tq->tq_nthreads);710if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {711/* Dynamic taskq may be able to spawn another thread */712if (taskq_thread_spawn(tq) == 0)713goto out;714}715716if ((t = task_alloc(tq, flags, &irqflags)) == NULL)717goto out;718719spin_lock(&t->tqent_lock);720721/* Queue to the front of the list to enforce TQ_NOQUEUE semantics */722if (flags & TQ_NOQUEUE) {723TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);724list_add(&t->tqent_list, &tq->tq_prio_list);725/* Queue to the priority list instead of the pending list */726} else if (flags & TQ_FRONT) {727TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);728list_add_tail(&t->tqent_list, &tq->tq_prio_list);729} else {730TQENT_SET_LIST(t, TQENT_LIST_PENDING);731list_add_tail(&t->tqent_list, &tq->tq_pend_list);732}733TQSTAT_INC_LIST(tq, t);734TQSTAT_INC(tq, tasks_total);735736t->tqent_id = rc = tq->tq_next_id;737tq->tq_next_id++;738t->tqent_func = func;739t->tqent_arg = arg;740t->tqent_taskq = tq;741t->tqent_timer.function = NULL;742t->tqent_timer.expires = 0;743t->tqent_birth = jiffies;744745ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));746747spin_unlock(&t->tqent_lock);748749wake_up(&tq->tq_work_waitq);750751TQSTAT_INC(tq, tasks_dispatched);752753/* Spawn additional taskq threads if required. */754if (!(flags & TQ_NOQUEUE) && tq->tq_nactive == tq->tq_nthreads)755(void) taskq_thread_spawn(tq);756out:757spin_unlock_irqrestore(&tq->tq_lock, irqflags);758return (rc);759}760EXPORT_SYMBOL(taskq_dispatch);761762taskqid_t763taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,764uint_t flags, clock_t expire_time)765{766taskqid_t rc = TASKQID_INVALID;767taskq_ent_t *t;768unsigned long irqflags;769770ASSERT(tq);771ASSERT(func);772773spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class);774775/* Taskq being destroyed and all tasks drained */776if (!(tq->tq_flags & TASKQ_ACTIVE))777goto out;778779if ((t = task_alloc(tq, flags, &irqflags)) == NULL)780goto out;781782spin_lock(&t->tqent_lock);783784/* Queue to the delay list for subsequent execution */785list_add_tail(&t->tqent_list, &tq->tq_delay_list);786TQENT_SET_LIST(t, TQENT_LIST_DELAY);787TQSTAT_INC_LIST(tq, t);788TQSTAT_INC(tq, tasks_total);789790t->tqent_id = rc = tq->tq_next_id;791tq->tq_next_id++;792t->tqent_func = func;793t->tqent_arg = arg;794t->tqent_taskq = tq;795t->tqent_timer.function = task_expire;796t->tqent_timer.expires = (unsigned long)expire_time;797add_timer(&t->tqent_timer);798799ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));800801spin_unlock(&t->tqent_lock);802803TQSTAT_INC(tq, tasks_dispatched_delayed);804805/* Spawn additional taskq threads if required. */806if (tq->tq_nactive == tq->tq_nthreads)807(void) taskq_thread_spawn(tq);808out:809spin_unlock_irqrestore(&tq->tq_lock, irqflags);810return (rc);811}812EXPORT_SYMBOL(taskq_dispatch_delay);813814void815taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,816taskq_ent_t *t)817{818unsigned long irqflags;819ASSERT(tq);820ASSERT(func);821822spin_lock_irqsave_nested(&tq->tq_lock, irqflags,823tq->tq_lock_class);824825/* Taskq being destroyed and all tasks drained */826if (!(tq->tq_flags & TASKQ_ACTIVE)) {827t->tqent_id = TASKQID_INVALID;828goto out;829}830831if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) {832/* Dynamic taskq may be able to spawn another thread */833if (taskq_thread_spawn(tq) == 0)834goto out;835flags |= TQ_FRONT;836}837838spin_lock(&t->tqent_lock);839840/*841* Make sure the entry is not on some other taskq; it is important to842* ASSERT() under lock843*/844ASSERT(taskq_empty_ent(t));845846/*847* Mark it as a prealloc'd task. This is important848* to ensure that we don't free it later.849*/850t->tqent_flags |= TQENT_FLAG_PREALLOC;851852/* Queue to the priority list instead of the pending list */853if (flags & TQ_FRONT) {854TQENT_SET_LIST(t, TQENT_LIST_PRIORITY);855list_add_tail(&t->tqent_list, &tq->tq_prio_list);856} else {857TQENT_SET_LIST(t, TQENT_LIST_PENDING);858list_add_tail(&t->tqent_list, &tq->tq_pend_list);859}860TQSTAT_INC_LIST(tq, t);861TQSTAT_INC(tq, tasks_total);862863t->tqent_id = tq->tq_next_id;864tq->tq_next_id++;865t->tqent_func = func;866t->tqent_arg = arg;867t->tqent_taskq = tq;868t->tqent_birth = jiffies;869870spin_unlock(&t->tqent_lock);871872wake_up(&tq->tq_work_waitq);873874TQSTAT_INC(tq, tasks_dispatched);875876/* Spawn additional taskq threads if required. */877if (tq->tq_nactive == tq->tq_nthreads)878(void) taskq_thread_spawn(tq);879out:880spin_unlock_irqrestore(&tq->tq_lock, irqflags);881}882EXPORT_SYMBOL(taskq_dispatch_ent);883884int885taskq_empty_ent(taskq_ent_t *t)886{887return (list_empty(&t->tqent_list));888}889EXPORT_SYMBOL(taskq_empty_ent);890891void892taskq_init_ent(taskq_ent_t *t)893{894spin_lock_init(&t->tqent_lock);895init_waitqueue_head(&t->tqent_waitq);896timer_setup(&t->tqent_timer, NULL, 0);897INIT_LIST_HEAD(&t->tqent_list);898t->tqent_id = 0;899t->tqent_func = NULL;900t->tqent_arg = NULL;901t->tqent_flags = 0;902t->tqent_taskq = NULL;903}904EXPORT_SYMBOL(taskq_init_ent);905906/*907* Return the next pending task, preference is given to tasks on the908* priority list which were dispatched with TQ_FRONT.909*/910static taskq_ent_t *911taskq_next_ent(taskq_t *tq)912{913struct list_head *list;914915if (!list_empty(&tq->tq_prio_list))916list = &tq->tq_prio_list;917else if (!list_empty(&tq->tq_pend_list))918list = &tq->tq_pend_list;919else920return (NULL);921922return (list_entry(list->next, taskq_ent_t, tqent_list));923}924925/*926* Spawns a new thread for the specified taskq.927*/928static void929taskq_thread_spawn_task(void *arg)930{931taskq_t *tq = (taskq_t *)arg;932unsigned long flags;933934if (taskq_thread_create(tq) == NULL) {935/* restore spawning count if failed */936spin_lock_irqsave_nested(&tq->tq_lock, flags,937tq->tq_lock_class);938tq->tq_nspawn--;939spin_unlock_irqrestore(&tq->tq_lock, flags);940}941}942943/*944* Spawn addition threads for dynamic taskqs (TASKQ_DYNAMIC) the current945* number of threads is insufficient to handle the pending tasks. These946* new threads must be created by the dedicated dynamic_taskq to avoid947* deadlocks between thread creation and memory reclaim. The system_taskq948* which is also a dynamic taskq cannot be safely used for this.949*/950static int951taskq_thread_spawn(taskq_t *tq)952{953int spawning = 0;954955if (!(tq->tq_flags & TASKQ_DYNAMIC))956return (0);957958tq->lastspawnstop = jiffies;959if ((tq->tq_nthreads + tq->tq_nspawn < tq->tq_maxthreads) &&960(tq->tq_flags & TASKQ_ACTIVE)) {961spawning = (++tq->tq_nspawn);962taskq_dispatch(dynamic_taskq, taskq_thread_spawn_task,963tq, TQ_NOSLEEP);964}965966return (spawning);967}968969/*970* Threads in a dynamic taskq may exit once there is no more work to do.971* To prevent threads from being created and destroyed too often limit972* the exit rate to one per spl_taskq_thread_timeout_ms.973*974* The first thread is the thread list is treated as the primary thread.975* There is nothing special about the primary thread but in order to avoid976* all the taskq pids from changing we opt to make it long running.977*/978static int979taskq_thread_should_stop(taskq_t *tq, taskq_thread_t *tqt)980{981ASSERT(!taskq_next_ent(tq));982if (!(tq->tq_flags & TASKQ_DYNAMIC) || !spl_taskq_thread_dynamic)983return (0);984if (!(tq->tq_flags & TASKQ_ACTIVE))985return (1);986if (list_first_entry(&(tq->tq_thread_list), taskq_thread_t,987tqt_thread_list) == tqt)988return (0);989ASSERT3U(tq->tq_nthreads, >, 1);990if (tq->tq_nspawn != 0)991return (0);992if (time_before(jiffies, tq->lastspawnstop +993msecs_to_jiffies(spl_taskq_thread_timeout_ms)))994return (0);995tq->lastspawnstop = jiffies;996return (1);997}998999static int1000taskq_thread(void *args)1001{1002DECLARE_WAITQUEUE(wait, current);1003sigset_t blocked;1004taskq_thread_t *tqt = args;1005taskq_t *tq;1006taskq_ent_t *t;1007int seq_tasks = 0;1008unsigned long flags;1009taskq_ent_t dup_task = {};10101011ASSERT(tqt);1012ASSERT(tqt->tqt_tq);1013tq = tqt->tqt_tq;1014current->flags |= PF_NOFREEZE;10151016(void) spl_fstrans_mark();10171018sigfillset(&blocked);1019sigprocmask(SIG_BLOCK, &blocked, NULL);1020flush_signals(current);10211022tsd_set(taskq_tsd, tq);1023spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);1024/*1025* If we are dynamically spawned, decrease spawning count. Note that1026* we could be created during taskq_create, in which case we shouldn't1027* do the decrement. But it's fine because taskq_create will reset1028* tq_nspawn later.1029*/1030if (tq->tq_flags & TASKQ_DYNAMIC)1031tq->tq_nspawn--;10321033/* Immediately exit if more threads than allowed were created. */1034if (tq->tq_nthreads >= tq->tq_maxthreads)1035goto error;10361037tq->tq_nthreads++;1038list_add_tail(&tqt->tqt_thread_list, &tq->tq_thread_list);1039wake_up(&tq->tq_wait_waitq);1040set_current_state(TASK_INTERRUPTIBLE);10411042TQSTAT_INC(tq, threads_total);10431044while (!kthread_should_stop()) {10451046if (list_empty(&tq->tq_pend_list) &&1047list_empty(&tq->tq_prio_list)) {10481049if (taskq_thread_should_stop(tq, tqt))1050break;10511052add_wait_queue_exclusive(&tq->tq_work_waitq, &wait);1053spin_unlock_irqrestore(&tq->tq_lock, flags);10541055TQSTAT_INC(tq, thread_sleeps);1056TQSTAT_INC(tq, threads_idle);10571058schedule();1059seq_tasks = 0;10601061TQSTAT_DEC(tq, threads_idle);1062TQSTAT_INC(tq, thread_wakeups);10631064spin_lock_irqsave_nested(&tq->tq_lock, flags,1065tq->tq_lock_class);1066remove_wait_queue(&tq->tq_work_waitq, &wait);1067} else {1068__set_current_state(TASK_RUNNING);1069}10701071if ((t = taskq_next_ent(tq)) != NULL) {1072list_del_init(&t->tqent_list);1073TQSTAT_DEC_LIST(tq, t);1074TQSTAT_DEC(tq, tasks_total);10751076/*1077* A TQENT_FLAG_PREALLOC task may be reused or freed1078* during the task function call. Store tqent_id and1079* tqent_flags here.1080*/1081tqt->tqt_id = t->tqent_id;1082tqt->tqt_flags = t->tqent_flags;10831084if (t->tqent_flags & TQENT_FLAG_PREALLOC) {1085dup_task = *t;1086t = &dup_task;1087}1088tqt->tqt_task = t;10891090taskq_insert_in_order(tq, tqt);1091tq->tq_nactive++;1092spin_unlock_irqrestore(&tq->tq_lock, flags);10931094TQSTAT_INC(tq, threads_active);10951096/* Perform the requested task */1097t->tqent_func(t->tqent_arg);10981099TQSTAT_DEC(tq, threads_active);1100if ((t->tqent_flags & TQENT_LIST_MASK) ==1101TQENT_LIST_PENDING)1102TQSTAT_INC(tq, tasks_executed_normal);1103else1104TQSTAT_INC(tq, tasks_executed_priority);1105TQSTAT_INC(tq, tasks_executed);11061107spin_lock_irqsave_nested(&tq->tq_lock, flags,1108tq->tq_lock_class);11091110tq->tq_nactive--;1111list_del_init(&tqt->tqt_active_list);1112tqt->tqt_task = NULL;11131114/* For prealloc'd tasks, we don't free anything. */1115if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC))1116task_done(tq, t);11171118/*1119* When the current lowest outstanding taskqid is1120* done calculate the new lowest outstanding id1121*/1122if (tq->tq_lowest_id == tqt->tqt_id) {1123tq->tq_lowest_id = taskq_lowest_id(tq);1124ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id);1125}11261127/* Spawn additional taskq threads if required. */1128if ((++seq_tasks) > spl_taskq_thread_sequential &&1129taskq_thread_spawn(tq))1130seq_tasks = 0;11311132tqt->tqt_id = TASKQID_INVALID;1133tqt->tqt_flags = 0;1134wake_up_all(&tq->tq_wait_waitq);1135} else1136TQSTAT_INC(tq, thread_wakeups_nowork);11371138set_current_state(TASK_INTERRUPTIBLE);11391140}11411142__set_current_state(TASK_RUNNING);1143tq->tq_nthreads--;1144list_del_init(&tqt->tqt_thread_list);11451146TQSTAT_DEC(tq, threads_total);1147TQSTAT_INC(tq, threads_destroyed);11481149error:1150kmem_free(tqt, sizeof (taskq_thread_t));1151spin_unlock_irqrestore(&tq->tq_lock, flags);11521153tsd_set(taskq_tsd, NULL);1154thread_exit();11551156return (0);1157}11581159static taskq_thread_t *1160taskq_thread_create(taskq_t *tq)1161{1162static int last_used_cpu = 0;1163taskq_thread_t *tqt;11641165tqt = kmem_alloc(sizeof (*tqt), KM_PUSHPAGE);1166INIT_LIST_HEAD(&tqt->tqt_thread_list);1167INIT_LIST_HEAD(&tqt->tqt_active_list);1168tqt->tqt_tq = tq;1169tqt->tqt_id = TASKQID_INVALID;11701171tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt,1172"%s", tq->tq_name);1173if (tqt->tqt_thread == NULL) {1174kmem_free(tqt, sizeof (taskq_thread_t));1175return (NULL);1176}11771178if (spl_taskq_thread_bind) {1179last_used_cpu = (last_used_cpu + 1) % num_online_cpus();1180kthread_bind(tqt->tqt_thread, last_used_cpu);1181}11821183if (spl_taskq_thread_priority)1184set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri));11851186wake_up_process(tqt->tqt_thread);11871188TQSTAT_INC(tq, threads_created);11891190return (tqt);1191}11921193static void1194taskq_stats_init(taskq_t *tq)1195{1196taskq_sums_t *tqs = &tq->tq_sums;1197wmsum_init(&tqs->tqs_threads_active, 0);1198wmsum_init(&tqs->tqs_threads_idle, 0);1199wmsum_init(&tqs->tqs_threads_total, 0);1200wmsum_init(&tqs->tqs_tasks_pending, 0);1201wmsum_init(&tqs->tqs_tasks_priority, 0);1202wmsum_init(&tqs->tqs_tasks_total, 0);1203wmsum_init(&tqs->tqs_tasks_delayed, 0);1204wmsum_init(&tqs->tqs_entries_free, 0);1205wmsum_init(&tqs->tqs_threads_created, 0);1206wmsum_init(&tqs->tqs_threads_destroyed, 0);1207wmsum_init(&tqs->tqs_tasks_dispatched, 0);1208wmsum_init(&tqs->tqs_tasks_dispatched_delayed, 0);1209wmsum_init(&tqs->tqs_tasks_executed_normal, 0);1210wmsum_init(&tqs->tqs_tasks_executed_priority, 0);1211wmsum_init(&tqs->tqs_tasks_executed, 0);1212wmsum_init(&tqs->tqs_tasks_delayed_requeued, 0);1213wmsum_init(&tqs->tqs_tasks_cancelled, 0);1214wmsum_init(&tqs->tqs_thread_wakeups, 0);1215wmsum_init(&tqs->tqs_thread_wakeups_nowork, 0);1216wmsum_init(&tqs->tqs_thread_sleeps, 0);1217}12181219static void1220taskq_stats_fini(taskq_t *tq)1221{1222taskq_sums_t *tqs = &tq->tq_sums;1223wmsum_fini(&tqs->tqs_threads_active);1224wmsum_fini(&tqs->tqs_threads_idle);1225wmsum_fini(&tqs->tqs_threads_total);1226wmsum_fini(&tqs->tqs_tasks_pending);1227wmsum_fini(&tqs->tqs_tasks_priority);1228wmsum_fini(&tqs->tqs_tasks_total);1229wmsum_fini(&tqs->tqs_tasks_delayed);1230wmsum_fini(&tqs->tqs_entries_free);1231wmsum_fini(&tqs->tqs_threads_created);1232wmsum_fini(&tqs->tqs_threads_destroyed);1233wmsum_fini(&tqs->tqs_tasks_dispatched);1234wmsum_fini(&tqs->tqs_tasks_dispatched_delayed);1235wmsum_fini(&tqs->tqs_tasks_executed_normal);1236wmsum_fini(&tqs->tqs_tasks_executed_priority);1237wmsum_fini(&tqs->tqs_tasks_executed);1238wmsum_fini(&tqs->tqs_tasks_delayed_requeued);1239wmsum_fini(&tqs->tqs_tasks_cancelled);1240wmsum_fini(&tqs->tqs_thread_wakeups);1241wmsum_fini(&tqs->tqs_thread_wakeups_nowork);1242wmsum_fini(&tqs->tqs_thread_sleeps);1243}12441245static int1246taskq_kstats_update(kstat_t *ksp, int rw)1247{1248if (rw == KSTAT_WRITE)1249return (EACCES);12501251taskq_t *tq = ksp->ks_private;1252taskq_kstats_t *tqks = ksp->ks_data;12531254tqks->tqks_threads_max.value.ui64 = tq->tq_maxthreads;1255tqks->tqks_entry_pool_min.value.ui64 = tq->tq_minalloc;1256tqks->tqks_entry_pool_max.value.ui64 = tq->tq_maxalloc;12571258taskq_sums_t *tqs = &tq->tq_sums;12591260tqks->tqks_threads_active.value.ui64 =1261wmsum_value(&tqs->tqs_threads_active);1262tqks->tqks_threads_idle.value.ui64 =1263wmsum_value(&tqs->tqs_threads_idle);1264tqks->tqks_threads_total.value.ui64 =1265wmsum_value(&tqs->tqs_threads_total);1266tqks->tqks_tasks_pending.value.ui64 =1267wmsum_value(&tqs->tqs_tasks_pending);1268tqks->tqks_tasks_priority.value.ui64 =1269wmsum_value(&tqs->tqs_tasks_priority);1270tqks->tqks_tasks_total.value.ui64 =1271wmsum_value(&tqs->tqs_tasks_total);1272tqks->tqks_tasks_delayed.value.ui64 =1273wmsum_value(&tqs->tqs_tasks_delayed);1274tqks->tqks_entries_free.value.ui64 =1275wmsum_value(&tqs->tqs_entries_free);1276tqks->tqks_threads_created.value.ui64 =1277wmsum_value(&tqs->tqs_threads_created);1278tqks->tqks_threads_destroyed.value.ui64 =1279wmsum_value(&tqs->tqs_threads_destroyed);1280tqks->tqks_tasks_dispatched.value.ui64 =1281wmsum_value(&tqs->tqs_tasks_dispatched);1282tqks->tqks_tasks_dispatched_delayed.value.ui64 =1283wmsum_value(&tqs->tqs_tasks_dispatched_delayed);1284tqks->tqks_tasks_executed_normal.value.ui64 =1285wmsum_value(&tqs->tqs_tasks_executed_normal);1286tqks->tqks_tasks_executed_priority.value.ui64 =1287wmsum_value(&tqs->tqs_tasks_executed_priority);1288tqks->tqks_tasks_executed.value.ui64 =1289wmsum_value(&tqs->tqs_tasks_executed);1290tqks->tqks_tasks_delayed_requeued.value.ui64 =1291wmsum_value(&tqs->tqs_tasks_delayed_requeued);1292tqks->tqks_tasks_cancelled.value.ui64 =1293wmsum_value(&tqs->tqs_tasks_cancelled);1294tqks->tqks_thread_wakeups.value.ui64 =1295wmsum_value(&tqs->tqs_thread_wakeups);1296tqks->tqks_thread_wakeups_nowork.value.ui64 =1297wmsum_value(&tqs->tqs_thread_wakeups_nowork);1298tqks->tqks_thread_sleeps.value.ui64 =1299wmsum_value(&tqs->tqs_thread_sleeps);13001301return (0);1302}13031304static void1305taskq_kstats_init(taskq_t *tq)1306{1307char name[TASKQ_NAMELEN+5]; /* 5 for dot, 3x instance digits, null */1308snprintf(name, sizeof (name), "%s.%d", tq->tq_name, tq->tq_instance);13091310kstat_t *ksp = kstat_create("taskq", 0, name, "misc",1311KSTAT_TYPE_NAMED, sizeof (taskq_kstats_t) / sizeof (kstat_named_t),1312KSTAT_FLAG_VIRTUAL);13131314if (ksp == NULL)1315return;13161317ksp->ks_private = tq;1318ksp->ks_update = taskq_kstats_update;1319ksp->ks_data = kmem_alloc(sizeof (taskq_kstats_t), KM_SLEEP);1320memcpy(ksp->ks_data, &taskq_kstats_template, sizeof (taskq_kstats_t));1321kstat_install(ksp);13221323tq->tq_ksp = ksp;1324}13251326static void1327taskq_kstats_fini(taskq_t *tq)1328{1329if (tq->tq_ksp == NULL)1330return;13311332kmem_free(tq->tq_ksp->ks_data, sizeof (taskq_kstats_t));1333kstat_delete(tq->tq_ksp);13341335tq->tq_ksp = NULL;1336}13371338taskq_t *1339taskq_create(const char *name, int threads_arg, pri_t pri,1340int minalloc, int maxalloc, uint_t flags)1341{1342taskq_t *tq;1343taskq_thread_t *tqt;1344int count = 0, rc = 0, i;1345unsigned long irqflags;1346int nthreads = threads_arg;13471348ASSERT(name != NULL);1349ASSERT(minalloc >= 0);1350ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */13511352/* Scale the number of threads using nthreads as a percentage */1353if (flags & TASKQ_THREADS_CPU_PCT) {1354ASSERT(nthreads <= 100);1355ASSERT(nthreads >= 0);1356nthreads = MIN(threads_arg, 100);1357nthreads = MAX(nthreads, 0);1358nthreads = MAX((num_online_cpus() * nthreads) /100, 1);1359}13601361tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE);1362if (tq == NULL)1363return (NULL);13641365tq->tq_hp_support = B_FALSE;13661367if (flags & TASKQ_THREADS_CPU_PCT) {1368tq->tq_hp_support = B_TRUE;1369if (cpuhp_state_add_instance_nocalls(spl_taskq_cpuhp_state,1370&tq->tq_hp_cb_node) != 0) {1371kmem_free(tq, sizeof (*tq));1372return (NULL);1373}1374}13751376spin_lock_init(&tq->tq_lock);1377INIT_LIST_HEAD(&tq->tq_thread_list);1378INIT_LIST_HEAD(&tq->tq_active_list);1379tq->tq_name = kmem_strdup(name);1380tq->tq_nactive = 0;1381tq->tq_nthreads = 0;1382tq->tq_nspawn = 0;1383tq->tq_maxthreads = nthreads;1384tq->tq_cpu_pct = threads_arg;1385tq->tq_pri = pri;1386tq->tq_minalloc = minalloc;1387tq->tq_maxalloc = maxalloc;1388tq->tq_nalloc = 0;1389tq->tq_flags = (flags | TASKQ_ACTIVE);1390tq->tq_next_id = TASKQID_INITIAL;1391tq->tq_lowest_id = TASKQID_INITIAL;1392tq->lastspawnstop = jiffies;1393INIT_LIST_HEAD(&tq->tq_free_list);1394INIT_LIST_HEAD(&tq->tq_pend_list);1395INIT_LIST_HEAD(&tq->tq_prio_list);1396INIT_LIST_HEAD(&tq->tq_delay_list);1397init_waitqueue_head(&tq->tq_work_waitq);1398init_waitqueue_head(&tq->tq_wait_waitq);1399tq->tq_lock_class = TQ_LOCK_GENERAL;1400INIT_LIST_HEAD(&tq->tq_taskqs);1401taskq_stats_init(tq);14021403if (flags & TASKQ_PREPOPULATE) {1404spin_lock_irqsave_nested(&tq->tq_lock, irqflags,1405tq->tq_lock_class);14061407for (i = 0; i < minalloc; i++)1408task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW,1409&irqflags));14101411spin_unlock_irqrestore(&tq->tq_lock, irqflags);1412}14131414if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic)1415nthreads = 1;14161417for (i = 0; i < nthreads; i++) {1418tqt = taskq_thread_create(tq);1419if (tqt == NULL)1420rc = 1;1421else1422count++;1423}14241425/* Wait for all threads to be started before potential destroy */1426wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count);1427/*1428* taskq_thread might have touched nspawn, but we don't want them to1429* because they're not dynamically spawned. So we reset it to 01430*/1431tq->tq_nspawn = 0;14321433if (rc) {1434taskq_destroy(tq);1435return (NULL);1436}14371438down_write(&tq_list_sem);1439tq->tq_instance = taskq_find_by_name(name) + 1;1440list_add_tail(&tq->tq_taskqs, &tq_list);1441up_write(&tq_list_sem);14421443/* Install kstats late, because the name includes tq_instance */1444taskq_kstats_init(tq);14451446return (tq);1447}1448EXPORT_SYMBOL(taskq_create);14491450void1451taskq_destroy(taskq_t *tq)1452{1453struct task_struct *thread;1454taskq_thread_t *tqt;1455taskq_ent_t *t;1456unsigned long flags;14571458ASSERT(tq);1459spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);1460tq->tq_flags &= ~TASKQ_ACTIVE;1461spin_unlock_irqrestore(&tq->tq_lock, flags);14621463if (tq->tq_hp_support) {1464VERIFY0(cpuhp_state_remove_instance_nocalls(1465spl_taskq_cpuhp_state, &tq->tq_hp_cb_node));1466}14671468/*1469* When TASKQ_ACTIVE is clear new tasks may not be added nor may1470* new worker threads be spawned for dynamic taskq.1471*/1472if (dynamic_taskq != NULL)1473taskq_wait_outstanding(dynamic_taskq, 0);14741475taskq_wait(tq);14761477taskq_kstats_fini(tq);14781479/* remove taskq from global list used by the kstats */1480down_write(&tq_list_sem);1481list_del(&tq->tq_taskqs);1482up_write(&tq_list_sem);14831484spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);1485/* wait for spawning threads to insert themselves to the list */1486while (tq->tq_nspawn) {1487spin_unlock_irqrestore(&tq->tq_lock, flags);1488schedule_timeout_interruptible(1);1489spin_lock_irqsave_nested(&tq->tq_lock, flags,1490tq->tq_lock_class);1491}14921493/*1494* Signal each thread to exit and block until it does. Each thread1495* is responsible for removing itself from the list and freeing its1496* taskq_thread_t. This allows for idle threads to opt to remove1497* themselves from the taskq. They can be recreated as needed.1498*/1499while (!list_empty(&tq->tq_thread_list)) {1500tqt = list_entry(tq->tq_thread_list.next,1501taskq_thread_t, tqt_thread_list);1502thread = tqt->tqt_thread;1503spin_unlock_irqrestore(&tq->tq_lock, flags);15041505kthread_stop(thread);15061507spin_lock_irqsave_nested(&tq->tq_lock, flags,1508tq->tq_lock_class);1509}15101511while (!list_empty(&tq->tq_free_list)) {1512t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list);15131514ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));15151516list_del_init(&t->tqent_list);1517task_free(tq, t);1518}15191520ASSERT0(tq->tq_nthreads);1521ASSERT0(tq->tq_nalloc);1522ASSERT0(tq->tq_nspawn);1523ASSERT(list_empty(&tq->tq_thread_list));1524ASSERT(list_empty(&tq->tq_active_list));1525ASSERT(list_empty(&tq->tq_free_list));1526ASSERT(list_empty(&tq->tq_pend_list));1527ASSERT(list_empty(&tq->tq_prio_list));1528ASSERT(list_empty(&tq->tq_delay_list));15291530spin_unlock_irqrestore(&tq->tq_lock, flags);15311532taskq_stats_fini(tq);1533kmem_strfree(tq->tq_name);1534kmem_free(tq, sizeof (taskq_t));1535}1536EXPORT_SYMBOL(taskq_destroy);15371538/*1539* Create a taskq with a specified number of pool threads. Allocate1540* and return an array of nthreads kthread_t pointers, one for each1541* thread in the pool. The array is not ordered and must be freed1542* by the caller.1543*/1544taskq_t *1545taskq_create_synced(const char *name, int nthreads, pri_t pri,1546int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)1547{1548taskq_t *tq;1549taskq_thread_t *tqt;1550int i = 0;1551kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,1552KM_SLEEP);15531554flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);15551556/* taskq_create spawns all the threads before returning */1557tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,1558flags | TASKQ_PREPOPULATE);1559VERIFY(tq != NULL);1560VERIFY(tq->tq_nthreads == nthreads);15611562list_for_each_entry(tqt, &tq->tq_thread_list, tqt_thread_list) {1563kthreads[i] = tqt->tqt_thread;1564i++;1565}15661567ASSERT3S(i, ==, nthreads);1568*ktpp = kthreads;15691570return (tq);1571}1572EXPORT_SYMBOL(taskq_create_synced);15731574static kstat_t *taskq_summary_ksp = NULL;15751576static int1577spl_taskq_kstat_headers(char *buf, size_t size)1578{1579size_t n = snprintf(buf, size,1580"%-20s | %-17s | %-23s\n"1581"%-20s | %-17s | %-23s\n"1582"%-20s | %-17s | %-23s\n",1583"", "threads", "tasks on queue",1584"taskq name", "tot [act idl] max", " pend [ norm high] dly",1585"--------------------", "-----------------",1586"-----------------------");1587return (n >= size ? ENOMEM : 0);1588}15891590static int1591spl_taskq_kstat_data(char *buf, size_t size, void *data)1592{1593struct list_head *tql = NULL;1594taskq_t *tq;1595char name[TASKQ_NAMELEN+5]; /* 5 for dot, 3x instance digits, null */1596char threads[25];1597char tasks[30];1598size_t n;1599int err = 0;16001601down_read(&tq_list_sem);1602list_for_each_prev(tql, &tq_list) {1603tq = list_entry(tql, taskq_t, tq_taskqs);16041605mutex_enter(tq->tq_ksp->ks_lock);1606taskq_kstats_update(tq->tq_ksp, KSTAT_READ);1607taskq_kstats_t *tqks = tq->tq_ksp->ks_data;16081609snprintf(name, sizeof (name), "%s.%d", tq->tq_name,1610tq->tq_instance);1611snprintf(threads, sizeof (threads), "%3llu [%3llu %3llu] %3llu",1612tqks->tqks_threads_total.value.ui64,1613tqks->tqks_threads_active.value.ui64,1614tqks->tqks_threads_idle.value.ui64,1615tqks->tqks_threads_max.value.ui64);1616snprintf(tasks, sizeof (tasks), "%5llu [%5llu %5llu] %3llu",1617tqks->tqks_tasks_total.value.ui64,1618tqks->tqks_tasks_pending.value.ui64,1619tqks->tqks_tasks_priority.value.ui64,1620tqks->tqks_tasks_delayed.value.ui64);16211622mutex_exit(tq->tq_ksp->ks_lock);16231624n = snprintf(buf, size, "%-20s | %-17s | %-23s\n",1625name, threads, tasks);1626if (n >= size) {1627err = ENOMEM;1628break;1629}16301631buf = &buf[n];1632size -= n;1633}16341635up_read(&tq_list_sem);16361637return (err);1638}16391640static void1641spl_taskq_kstat_init(void)1642{1643kstat_t *ksp = kstat_create("taskq", 0, "summary", "misc",1644KSTAT_TYPE_RAW, 0, KSTAT_FLAG_VIRTUAL);16451646if (ksp == NULL)1647return;16481649ksp->ks_data = (void *)(uintptr_t)1;1650ksp->ks_ndata = 1;1651kstat_set_raw_ops(ksp, spl_taskq_kstat_headers,1652spl_taskq_kstat_data, NULL);1653kstat_install(ksp);16541655taskq_summary_ksp = ksp;1656}16571658static void1659spl_taskq_kstat_fini(void)1660{1661if (taskq_summary_ksp == NULL)1662return;16631664kstat_delete(taskq_summary_ksp);1665taskq_summary_ksp = NULL;1666}16671668static unsigned int spl_taskq_kick = 0;16691670static int1671param_set_taskq_kick(const char *val, zfs_kernel_param_t *kp)1672{1673int ret;1674taskq_t *tq = NULL;1675taskq_ent_t *t;1676unsigned long flags;16771678ret = param_set_uint(val, kp);1679if (ret < 0 || !spl_taskq_kick)1680return (ret);1681/* reset value */1682spl_taskq_kick = 0;16831684down_read(&tq_list_sem);1685list_for_each_entry(tq, &tq_list, tq_taskqs) {1686spin_lock_irqsave_nested(&tq->tq_lock, flags,1687tq->tq_lock_class);1688/* Check if the first pending is older than 5 seconds */1689t = taskq_next_ent(tq);1690if (t && time_after(jiffies, t->tqent_birth + 5*HZ)) {1691(void) taskq_thread_spawn(tq);1692printk(KERN_INFO "spl: Kicked taskq %s/%d\n",1693tq->tq_name, tq->tq_instance);1694}1695spin_unlock_irqrestore(&tq->tq_lock, flags);1696}1697up_read(&tq_list_sem);1698return (ret);1699}17001701module_param_call(spl_taskq_kick, param_set_taskq_kick, param_get_uint,1702&spl_taskq_kick, 0644);1703MODULE_PARM_DESC(spl_taskq_kick,1704"Write nonzero to kick stuck taskqs to spawn more threads");17051706/*1707* This callback will be called exactly once for each core that comes online,1708* for each dynamic taskq. We attempt to expand taskqs that have1709* TASKQ_THREADS_CPU_PCT set. We need to redo the percentage calculation every1710* time, to correctly determine whether or not to add a thread.1711*/1712static int1713spl_taskq_expand(unsigned int cpu, struct hlist_node *node)1714{1715taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);1716unsigned long flags;1717int err = 0;17181719ASSERT(tq);1720spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);17211722if (!(tq->tq_flags & TASKQ_ACTIVE)) {1723spin_unlock_irqrestore(&tq->tq_lock, flags);1724return (err);1725}17261727ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);1728int nthreads = MIN(tq->tq_cpu_pct, 100);1729nthreads = MAX(((num_online_cpus() + 1) * nthreads) / 100, 1);1730tq->tq_maxthreads = nthreads;17311732if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&1733tq->tq_maxthreads > tq->tq_nthreads) {1734spin_unlock_irqrestore(&tq->tq_lock, flags);1735taskq_thread_t *tqt = taskq_thread_create(tq);1736if (tqt == NULL)1737err = -1;1738return (err);1739}1740spin_unlock_irqrestore(&tq->tq_lock, flags);1741return (err);1742}17431744/*1745* While we don't support offlining CPUs, it is possible that CPUs will fail1746* to online successfully. We do need to be able to handle this case1747* gracefully.1748*/1749static int1750spl_taskq_prepare_down(unsigned int cpu, struct hlist_node *node)1751{1752taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node);1753unsigned long flags;17541755ASSERT(tq);1756spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class);17571758if (!(tq->tq_flags & TASKQ_ACTIVE))1759goto out;17601761ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);1762int nthreads = MIN(tq->tq_cpu_pct, 100);1763nthreads = MAX(((num_online_cpus()) * nthreads) / 100, 1);1764tq->tq_maxthreads = nthreads;17651766if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) &&1767tq->tq_maxthreads < tq->tq_nthreads) {1768ASSERT3U(tq->tq_maxthreads, ==, tq->tq_nthreads - 1);1769taskq_thread_t *tqt = list_entry(tq->tq_thread_list.next,1770taskq_thread_t, tqt_thread_list);1771struct task_struct *thread = tqt->tqt_thread;1772spin_unlock_irqrestore(&tq->tq_lock, flags);17731774kthread_stop(thread);17751776return (0);1777}17781779out:1780spin_unlock_irqrestore(&tq->tq_lock, flags);1781return (0);1782}17831784int1785spl_taskq_init(void)1786{1787init_rwsem(&tq_list_sem);1788tsd_create(&taskq_tsd, NULL);17891790spl_taskq_cpuhp_state = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN,1791"fs/spl_taskq:online", spl_taskq_expand, spl_taskq_prepare_down);17921793system_taskq = taskq_create("spl_system_taskq", MAX(boot_ncpus, 64),1794maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);1795if (system_taskq == NULL)1796return (-ENOMEM);17971798system_delay_taskq = taskq_create("spl_delay_taskq", MAX(boot_ncpus, 4),1799maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC);1800if (system_delay_taskq == NULL) {1801cpuhp_remove_multi_state(spl_taskq_cpuhp_state);1802taskq_destroy(system_taskq);1803return (-ENOMEM);1804}18051806dynamic_taskq = taskq_create("spl_dynamic_taskq", 1,1807maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE);1808if (dynamic_taskq == NULL) {1809cpuhp_remove_multi_state(spl_taskq_cpuhp_state);1810taskq_destroy(system_taskq);1811taskq_destroy(system_delay_taskq);1812return (-ENOMEM);1813}18141815/*1816* This is used to annotate tq_lock, so1817* taskq_dispatch -> taskq_thread_spawn -> taskq_dispatch1818* does not trigger a lockdep warning re: possible recursive locking1819*/1820dynamic_taskq->tq_lock_class = TQ_LOCK_DYNAMIC;18211822spl_taskq_kstat_init();18231824return (0);1825}18261827void1828spl_taskq_fini(void)1829{1830spl_taskq_kstat_fini();18311832taskq_destroy(dynamic_taskq);1833dynamic_taskq = NULL;18341835taskq_destroy(system_delay_taskq);1836system_delay_taskq = NULL;18371838taskq_destroy(system_taskq);1839system_taskq = NULL;18401841tsd_destroy(&taskq_tsd);18421843cpuhp_remove_multi_state(spl_taskq_cpuhp_state);1844spl_taskq_cpuhp_state = 0;1845}184618471848