Path: blob/main/sys/contrib/openzfs/lib/libspl/taskq.c
96339 views
// SPDX-License-Identifier: CDDL-1.01/*2* CDDL HEADER START3*4* The contents of this file are subject to the terms of the5* Common Development and Distribution License (the "License").6* You may not use this file except in compliance with the License.7*8* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE9* or https://opensource.org/licenses/CDDL-1.0.10* See the License for the specific language governing permissions11* and limitations under the License.12*13* When distributing Covered Code, include this CDDL HEADER in each14* file and include the License file at usr/src/OPENSOLARIS.LICENSE.15* If applicable, add the following below this CDDL HEADER, with the16* fields enclosed by brackets "[]" replaced with your own identifying17* information: Portions Copyright [yyyy] [name of copyright owner]18*19* CDDL HEADER END20*/21/*22* Copyright 2010 Sun Microsystems, Inc. All rights reserved.23* Use is subject to license terms.24*/25/*26* Copyright 2011 Nexenta Systems, Inc. All rights reserved.27* Copyright 2012 Garrett D'Amore <[email protected]>. All rights reserved.28* Copyright (c) 2014 by Delphix. All rights reserved.29*/3031#include <sys/sysmacros.h>32#include <sys/timer.h>33#include <sys/types.h>34#include <sys/thread.h>35#include <sys/taskq.h>36#include <sys/kmem.h>37#include <pthread.h>3839static pthread_key_t taskq_tsd;40static pthread_once_t taskq_tsd_once = PTHREAD_ONCE_INIT;4142static taskq_t *__system_taskq = NULL;43static taskq_t *__system_delay_taskq = NULL;4445taskq_t46*_system_taskq(void)47{48return (__system_taskq);49}5051taskq_t52*_system_delay_taskq(void)53{54return (__system_delay_taskq);55}5657#define TASKQ_ACTIVE 0x000100005859static taskq_ent_t *60task_alloc(taskq_t *tq, int tqflags)61{62taskq_ent_t *t;63int rv;6465again: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {66ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));67tq->tq_freelist = t->tqent_next;68} else {69if (tq->tq_nalloc >= tq->tq_maxalloc) {70if (!(tqflags & KM_SLEEP))71return (NULL);7273/*74* We don't want to exceed tq_maxalloc, but we can't75* wait for other tasks to complete (and thus free up76* task structures) without risking deadlock with77* the caller. So, we just delay for one second78* to throttle the allocation rate. If we have tasks79* complete before one second timeout expires then80* taskq_ent_free will signal us and we will81* immediately retry the allocation.82*/83tq->tq_maxalloc_wait++;84rv = cv_timedwait(&tq->tq_maxalloc_cv,85&tq->tq_lock, ddi_get_lbolt() + hz);86tq->tq_maxalloc_wait--;87if (rv > 0)88goto again; /* signaled */89}90mutex_exit(&tq->tq_lock);9192t = kmem_alloc(sizeof (taskq_ent_t), tqflags);9394mutex_enter(&tq->tq_lock);95if (t != NULL) {96/* Make sure we start without any flags */97t->tqent_flags = 0;98tq->tq_nalloc++;99}100}101return (t);102}103104static void105task_free(taskq_t *tq, taskq_ent_t *t)106{107if (tq->tq_nalloc <= tq->tq_minalloc) {108t->tqent_next = tq->tq_freelist;109tq->tq_freelist = t;110} else {111tq->tq_nalloc--;112mutex_exit(&tq->tq_lock);113kmem_free(t, sizeof (taskq_ent_t));114mutex_enter(&tq->tq_lock);115}116117if (tq->tq_maxalloc_wait)118cv_signal(&tq->tq_maxalloc_cv);119}120121taskqid_t122taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)123{124taskq_ent_t *t;125126mutex_enter(&tq->tq_lock);127ASSERT(tq->tq_flags & TASKQ_ACTIVE);128if ((t = task_alloc(tq, tqflags)) == NULL) {129mutex_exit(&tq->tq_lock);130return (0);131}132if (tqflags & TQ_FRONT) {133t->tqent_next = tq->tq_task.tqent_next;134t->tqent_prev = &tq->tq_task;135} else {136t->tqent_next = &tq->tq_task;137t->tqent_prev = tq->tq_task.tqent_prev;138}139t->tqent_next->tqent_prev = t;140t->tqent_prev->tqent_next = t;141t->tqent_func = func;142t->tqent_arg = arg;143t->tqent_flags = 0;144cv_signal(&tq->tq_dispatch_cv);145mutex_exit(&tq->tq_lock);146return (1);147}148149taskqid_t150taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags,151clock_t expire_time)152{153(void) tq, (void) func, (void) arg, (void) tqflags, (void) expire_time;154return (0);155}156157int158taskq_empty_ent(taskq_ent_t *t)159{160return (t->tqent_next == NULL);161}162163void164taskq_init_ent(taskq_ent_t *t)165{166t->tqent_next = NULL;167t->tqent_prev = NULL;168t->tqent_func = NULL;169t->tqent_arg = NULL;170t->tqent_flags = 0;171}172173void174taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,175taskq_ent_t *t)176{177ASSERT(func != NULL);178179/*180* Mark it as a prealloc'd task. This is important181* to ensure that we don't free it later.182*/183t->tqent_flags |= TQENT_FLAG_PREALLOC;184/*185* Enqueue the task to the underlying queue.186*/187mutex_enter(&tq->tq_lock);188189if (flags & TQ_FRONT) {190t->tqent_next = tq->tq_task.tqent_next;191t->tqent_prev = &tq->tq_task;192} else {193t->tqent_next = &tq->tq_task;194t->tqent_prev = tq->tq_task.tqent_prev;195}196t->tqent_next->tqent_prev = t;197t->tqent_prev->tqent_next = t;198t->tqent_func = func;199t->tqent_arg = arg;200cv_signal(&tq->tq_dispatch_cv);201mutex_exit(&tq->tq_lock);202}203204void205taskq_wait(taskq_t *tq)206{207mutex_enter(&tq->tq_lock);208while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)209cv_wait(&tq->tq_wait_cv, &tq->tq_lock);210mutex_exit(&tq->tq_lock);211}212213void214taskq_wait_id(taskq_t *tq, taskqid_t id)215{216(void) id;217taskq_wait(tq);218}219220void221taskq_wait_outstanding(taskq_t *tq, taskqid_t id)222{223(void) id;224taskq_wait(tq);225}226227static void228taskq_tsd_init(void)229{230VERIFY0(pthread_key_create(&taskq_tsd, NULL));231}232233static __attribute__((noreturn)) void234taskq_thread(void *arg)235{236taskq_t *tq = arg;237taskq_ent_t *t;238boolean_t prealloc;239240pthread_once(&taskq_tsd_once, taskq_tsd_init);241VERIFY0(pthread_setspecific(taskq_tsd, tq));242243mutex_enter(&tq->tq_lock);244while (tq->tq_flags & TASKQ_ACTIVE) {245if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {246if (--tq->tq_active == 0)247cv_broadcast(&tq->tq_wait_cv);248cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);249tq->tq_active++;250continue;251}252t->tqent_prev->tqent_next = t->tqent_next;253t->tqent_next->tqent_prev = t->tqent_prev;254t->tqent_next = NULL;255t->tqent_prev = NULL;256prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;257mutex_exit(&tq->tq_lock);258259rw_enter(&tq->tq_threadlock, RW_READER);260t->tqent_func(t->tqent_arg);261rw_exit(&tq->tq_threadlock);262263mutex_enter(&tq->tq_lock);264if (!prealloc)265task_free(tq, t);266}267tq->tq_nthreads--;268cv_broadcast(&tq->tq_wait_cv);269mutex_exit(&tq->tq_lock);270thread_exit();271}272273taskq_t *274taskq_create(const char *name, int nthreads, pri_t pri,275int minalloc, int maxalloc, uint_t flags)276{277(void) pri;278taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP);279int t;280281if (flags & TASKQ_THREADS_CPU_PCT) {282int pct;283ASSERT3S(nthreads, >=, 0);284ASSERT3S(nthreads, <=, 100);285pct = MIN(nthreads, 100);286pct = MAX(pct, 0);287288nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100;289nthreads = MAX(nthreads, 1); /* need at least 1 thread */290} else {291ASSERT3S(nthreads, >=, 1);292}293294rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL);295mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);296cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);297cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);298cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL);299(void) strlcpy(tq->tq_name, name, sizeof (tq->tq_name));300tq->tq_flags = flags | TASKQ_ACTIVE;301tq->tq_active = nthreads;302tq->tq_nthreads = nthreads;303tq->tq_minalloc = minalloc;304tq->tq_maxalloc = maxalloc;305tq->tq_task.tqent_next = &tq->tq_task;306tq->tq_task.tqent_prev = &tq->tq_task;307tq->tq_threadlist = kmem_alloc(nthreads * sizeof (kthread_t *),308KM_SLEEP);309310if (flags & TASKQ_PREPOPULATE) {311mutex_enter(&tq->tq_lock);312while (minalloc-- > 0)313task_free(tq, task_alloc(tq, KM_SLEEP));314mutex_exit(&tq->tq_lock);315}316317for (t = 0; t < nthreads; t++)318VERIFY((tq->tq_threadlist[t] = thread_create_named(tq->tq_name,319NULL, 0, taskq_thread, tq, 0, &p0, TS_RUN, pri)) != NULL);320321return (tq);322}323324void325taskq_destroy(taskq_t *tq)326{327int nthreads = tq->tq_nthreads;328329taskq_wait(tq);330331mutex_enter(&tq->tq_lock);332333tq->tq_flags &= ~TASKQ_ACTIVE;334cv_broadcast(&tq->tq_dispatch_cv);335336while (tq->tq_nthreads != 0)337cv_wait(&tq->tq_wait_cv, &tq->tq_lock);338339tq->tq_minalloc = 0;340while (tq->tq_nalloc != 0) {341ASSERT(tq->tq_freelist != NULL);342taskq_ent_t *tqent_nexttq = tq->tq_freelist->tqent_next;343task_free(tq, tq->tq_freelist);344tq->tq_freelist = tqent_nexttq;345}346347mutex_exit(&tq->tq_lock);348349kmem_free(tq->tq_threadlist, nthreads * sizeof (kthread_t *));350351rw_destroy(&tq->tq_threadlock);352mutex_destroy(&tq->tq_lock);353cv_destroy(&tq->tq_dispatch_cv);354cv_destroy(&tq->tq_wait_cv);355cv_destroy(&tq->tq_maxalloc_cv);356357kmem_free(tq, sizeof (taskq_t));358}359360/*361* Create a taskq with a specified number of pool threads. Allocate362* and return an array of nthreads kthread_t pointers, one for each363* thread in the pool. The array is not ordered and must be freed364* by the caller.365*/366taskq_t *367taskq_create_synced(const char *name, int nthreads, pri_t pri,368int minalloc, int maxalloc, uint_t flags, kthread_t ***ktpp)369{370taskq_t *tq;371kthread_t **kthreads = kmem_zalloc(sizeof (*kthreads) * nthreads,372KM_SLEEP);373374(void) pri; (void) minalloc; (void) maxalloc;375376flags &= ~(TASKQ_DYNAMIC | TASKQ_THREADS_CPU_PCT | TASKQ_DC_BATCH);377378tq = taskq_create(name, nthreads, minclsyspri, nthreads, INT_MAX,379flags | TASKQ_PREPOPULATE);380VERIFY(tq != NULL);381VERIFY(tq->tq_nthreads == nthreads);382383for (int i = 0; i < nthreads; i++) {384kthreads[i] = tq->tq_threadlist[i];385}386*ktpp = kthreads;387return (tq);388}389390int391taskq_member(taskq_t *tq, kthread_t *t)392{393int i;394395for (i = 0; i < tq->tq_nthreads; i++)396if (tq->tq_threadlist[i] == t)397return (1);398399return (0);400}401402taskq_t *403taskq_of_curthread(void)404{405return (pthread_getspecific(taskq_tsd));406}407408int409taskq_cancel_id(taskq_t *tq, taskqid_t id, boolean_t wait)410{411(void) tq, (void) id, (void) wait;412return (ENOENT);413}414415void416system_taskq_init(void)417{418__system_taskq = taskq_create("system_taskq", 64, maxclsyspri, 4, 512,419TASKQ_DYNAMIC | TASKQ_PREPOPULATE);420__system_delay_taskq = taskq_create("delay_taskq", 4, maxclsyspri, 4,421512, TASKQ_DYNAMIC | TASKQ_PREPOPULATE);422}423424void425system_taskq_fini(void)426{427taskq_destroy(__system_taskq);428__system_taskq = NULL; /* defensive */429taskq_destroy(__system_delay_taskq);430__system_delay_taskq = NULL;431}432433434