Path: blob/main/sys/contrib/openzfs/lib/libtpool/thread_pool.c
48375 views
// SPDX-License-Identifier: CDDL-1.01/*2* CDDL HEADER START3*4* The contents of this file are subject to the terms of the5* Common Development and Distribution License (the "License").6* You may not use this file except in compliance with the License.7*8* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE9* or https://opensource.org/licenses/CDDL-1.0.10* See the License for the specific language governing permissions11* and limitations under the License.12*13* When distributing Covered Code, include this CDDL HEADER in each14* file and include the License file at usr/src/OPENSOLARIS.LICENSE.15* If applicable, add the following below this CDDL HEADER, with the16* fields enclosed by brackets "[]" replaced with your own identifying17* information: Portions Copyright [yyyy] [name of copyright owner]18*19* CDDL HEADER END20*/2122/*23* Copyright 2008 Sun Microsystems, Inc. All rights reserved.24* Use is subject to license terms.25*/2627#include <stdlib.h>28#include <signal.h>29#include <errno.h>30#include <assert.h>31#include <limits.h>32#include "thread_pool_impl.h"3334static pthread_mutex_t thread_pool_lock = PTHREAD_MUTEX_INITIALIZER;35static tpool_t *thread_pools = NULL;3637static void38delete_pool(tpool_t *tpool)39{40tpool_job_t *job;4142ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL);4344/*45* Unlink the pool from the global list of all pools.46*/47(void) pthread_mutex_lock(&thread_pool_lock);48if (thread_pools == tpool)49thread_pools = tpool->tp_forw;50if (thread_pools == tpool)51thread_pools = NULL;52else {53tpool->tp_back->tp_forw = tpool->tp_forw;54tpool->tp_forw->tp_back = tpool->tp_back;55}56pthread_mutex_unlock(&thread_pool_lock);5758/*59* There should be no pending jobs, but just in case...60*/61for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {62tpool->tp_head = job->tpj_next;63free(job);64}65(void) pthread_attr_destroy(&tpool->tp_attr);66free(tpool);67}6869/*70* Worker thread is terminating.71*/72static void73worker_cleanup(void *arg)74{75tpool_t *tpool = (tpool_t *)arg;7677if (--tpool->tp_current == 0 &&78(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {79if (tpool->tp_flags & TP_ABANDON) {80pthread_mutex_unlock(&tpool->tp_mutex);81delete_pool(tpool);82return;83}84if (tpool->tp_flags & TP_DESTROY)85(void) pthread_cond_broadcast(&tpool->tp_busycv);86}87pthread_mutex_unlock(&tpool->tp_mutex);88}8990static void91notify_waiters(tpool_t *tpool)92{93if (tpool->tp_head == NULL && tpool->tp_active == NULL) {94tpool->tp_flags &= ~TP_WAIT;95(void) pthread_cond_broadcast(&tpool->tp_waitcv);96}97}9899/*100* Called by a worker thread on return from a tpool_dispatch()d job.101*/102static void103job_cleanup(void *arg)104{105tpool_t *tpool = (tpool_t *)arg;106107pthread_t my_tid = pthread_self();108tpool_active_t *activep;109tpool_active_t **activepp;110111pthread_mutex_lock(&tpool->tp_mutex);112for (activepp = &tpool->tp_active; ; activepp = &activep->tpa_next) {113activep = *activepp;114if (activep->tpa_tid == my_tid) {115*activepp = activep->tpa_next;116break;117}118}119if (tpool->tp_flags & TP_WAIT)120notify_waiters(tpool);121}122123static void *124tpool_worker(void *arg)125{126tpool_t *tpool = (tpool_t *)arg;127int elapsed;128tpool_job_t *job;129void (*func)(void *);130tpool_active_t active;131132pthread_mutex_lock(&tpool->tp_mutex);133pthread_cleanup_push(worker_cleanup, tpool);134135/*136* This is the worker's main loop.137* It will only be left if a timeout or an error has occurred.138*/139active.tpa_tid = pthread_self();140for (;;) {141elapsed = 0;142tpool->tp_idle++;143if (tpool->tp_flags & TP_WAIT)144notify_waiters(tpool);145while ((tpool->tp_head == NULL ||146(tpool->tp_flags & TP_SUSPEND)) &&147!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {148if (tpool->tp_current <= tpool->tp_minimum ||149tpool->tp_linger == 0) {150(void) pthread_cond_wait(&tpool->tp_workcv,151&tpool->tp_mutex);152} else {153struct timespec ts;154155clock_gettime(CLOCK_REALTIME, &ts);156ts.tv_sec += tpool->tp_linger;157158if (pthread_cond_timedwait(&tpool->tp_workcv,159&tpool->tp_mutex, &ts) != 0) {160elapsed = 1;161break;162}163}164}165tpool->tp_idle--;166if (tpool->tp_flags & TP_DESTROY)167break;168if (tpool->tp_flags & TP_ABANDON) {169/* can't abandon a suspended pool */170if (tpool->tp_flags & TP_SUSPEND) {171tpool->tp_flags &= ~TP_SUSPEND;172(void) pthread_cond_broadcast(173&tpool->tp_workcv);174}175if (tpool->tp_head == NULL)176break;177}178if ((job = tpool->tp_head) != NULL &&179!(tpool->tp_flags & TP_SUSPEND)) {180elapsed = 0;181func = job->tpj_func;182arg = job->tpj_arg;183tpool->tp_head = job->tpj_next;184if (job == tpool->tp_tail)185tpool->tp_tail = NULL;186tpool->tp_njobs--;187active.tpa_next = tpool->tp_active;188tpool->tp_active = &active;189pthread_mutex_unlock(&tpool->tp_mutex);190pthread_cleanup_push(job_cleanup, tpool);191free(job);192193sigset_t maskset;194(void) pthread_sigmask(SIG_SETMASK, NULL, &maskset);195196/*197* Call the specified function.198*/199func(arg);200/*201* We don't know what this thread has been doing,202* so we reset its signal mask and cancellation203* state back to the values prior to calling func().204*/205(void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);206(void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,207NULL);208(void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,209NULL);210pthread_cleanup_pop(1);211}212if (elapsed && tpool->tp_current > tpool->tp_minimum) {213/*214* We timed out and there is no work to be done215* and the number of workers exceeds the minimum.216* Exit now to reduce the size of the pool.217*/218break;219}220}221pthread_cleanup_pop(1);222return (arg);223}224225/*226* Create a worker thread, with default signals blocked.227*/228static int229create_worker(tpool_t *tpool)230{231pthread_t thread;232sigset_t oset;233int error;234235(void) pthread_sigmask(SIG_SETMASK, NULL, &oset);236error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);237(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);238return (error);239}240241242/*243* pthread_attr_clone: make a copy of a pthread_attr_t. When old_attr244* is NULL initialize the cloned attr using default values.245*/246static int247pthread_attr_clone(pthread_attr_t *attr, const pthread_attr_t *old_attr)248{249int error;250251error = pthread_attr_init(attr);252if (error || (old_attr == NULL))253return (error);254255#ifdef __GLIBC__256cpu_set_t cpuset;257size_t cpusetsize = sizeof (cpuset);258error = pthread_attr_getaffinity_np(old_attr, cpusetsize, &cpuset);259if (error == 0)260error = pthread_attr_setaffinity_np(attr, cpusetsize, &cpuset);261if (error)262goto error;263#endif /* __GLIBC__ */264265int detachstate;266error = pthread_attr_getdetachstate(old_attr, &detachstate);267if (error == 0)268error = pthread_attr_setdetachstate(attr, detachstate);269if (error)270goto error;271272size_t guardsize;273error = pthread_attr_getguardsize(old_attr, &guardsize);274if (error == 0)275error = pthread_attr_setguardsize(attr, guardsize);276if (error)277goto error;278279int inheritsched;280error = pthread_attr_getinheritsched(old_attr, &inheritsched);281if (error == 0)282error = pthread_attr_setinheritsched(attr, inheritsched);283if (error)284goto error;285286struct sched_param param;287error = pthread_attr_getschedparam(old_attr, ¶m);288if (error == 0)289error = pthread_attr_setschedparam(attr, ¶m);290if (error)291goto error;292293int policy;294error = pthread_attr_getschedpolicy(old_attr, &policy);295if (error == 0)296error = pthread_attr_setschedpolicy(attr, policy);297if (error)298goto error;299300int scope;301error = pthread_attr_getscope(old_attr, &scope);302if (error == 0)303error = pthread_attr_setscope(attr, scope);304if (error)305goto error;306307void *stackaddr;308size_t stacksize;309error = pthread_attr_getstack(old_attr, &stackaddr, &stacksize);310if (error == 0)311error = pthread_attr_setstack(attr, stackaddr, stacksize);312if (error)313goto error;314315return (0);316error:317pthread_attr_destroy(attr);318return (error);319}320321tpool_t *322tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,323pthread_attr_t *attr)324{325tpool_t *tpool;326void *stackaddr;327size_t stacksize;328size_t minstack;329int error;330331if (min_threads > max_threads || max_threads < 1) {332errno = EINVAL;333return (NULL);334}335if (attr != NULL) {336if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) {337errno = EINVAL;338return (NULL);339}340/*341* Allow only one thread in the pool with a specified stack.342* Require threads to have at least the minimum stack size.343*/344minstack = PTHREAD_STACK_MIN;345if (stackaddr != NULL) {346if (stacksize < minstack || max_threads != 1) {347errno = EINVAL;348return (NULL);349}350} else if (stacksize != 0 && stacksize < minstack) {351errno = EINVAL;352return (NULL);353}354}355356tpool = calloc(1, sizeof (*tpool));357if (tpool == NULL) {358errno = ENOMEM;359return (NULL);360}361(void) pthread_mutex_init(&tpool->tp_mutex, NULL);362(void) pthread_cond_init(&tpool->tp_busycv, NULL);363(void) pthread_cond_init(&tpool->tp_workcv, NULL);364(void) pthread_cond_init(&tpool->tp_waitcv, NULL);365tpool->tp_minimum = min_threads;366tpool->tp_maximum = max_threads;367tpool->tp_linger = linger;368369/*370* We cannot just copy the attribute pointer.371* We need to initialize a new pthread_attr_t structure372* with the values from the user-supplied pthread_attr_t.373* If the attribute pointer is NULL, we need to initialize374* the new pthread_attr_t structure with default values.375*/376error = pthread_attr_clone(&tpool->tp_attr, attr);377if (error) {378free(tpool);379errno = error;380return (NULL);381}382383/* make all pool threads be detached daemon threads */384(void) pthread_attr_setdetachstate(&tpool->tp_attr,385PTHREAD_CREATE_DETACHED);386387/* insert into the global list of all thread pools */388pthread_mutex_lock(&thread_pool_lock);389if (thread_pools == NULL) {390tpool->tp_forw = tpool;391tpool->tp_back = tpool;392thread_pools = tpool;393} else {394thread_pools->tp_back->tp_forw = tpool;395tpool->tp_forw = thread_pools;396tpool->tp_back = thread_pools->tp_back;397thread_pools->tp_back = tpool;398}399pthread_mutex_unlock(&thread_pool_lock);400401return (tpool);402}403404/*405* Dispatch a work request to the thread pool.406* If there are idle workers, awaken one.407* Else, if the maximum number of workers has408* not been reached, spawn a new worker thread.409* Else just return with the job added to the queue.410*/411int412tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)413{414tpool_job_t *job;415416ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));417418if ((job = calloc(1, sizeof (*job))) == NULL)419return (-1);420job->tpj_next = NULL;421job->tpj_func = func;422job->tpj_arg = arg;423424pthread_mutex_lock(&tpool->tp_mutex);425426if (!(tpool->tp_flags & TP_SUSPEND)) {427if (tpool->tp_idle > 0)428(void) pthread_cond_signal(&tpool->tp_workcv);429else if (tpool->tp_current >= tpool->tp_maximum) {430/* At worker limit. Leave task on queue */431} else {432if (create_worker(tpool) == 0) {433/* Started a new worker thread */434tpool->tp_current++;435} else if (tpool->tp_current > 0) {436/* Leave task on queue */437} else {438/* Cannot start a single worker! */439pthread_mutex_unlock(&tpool->tp_mutex);440free(job);441return (-1);442}443}444}445446if (tpool->tp_head == NULL)447tpool->tp_head = job;448else449tpool->tp_tail->tpj_next = job;450tpool->tp_tail = job;451tpool->tp_njobs++;452453pthread_mutex_unlock(&tpool->tp_mutex);454return (0);455}456457static void458tpool_cleanup(void *arg)459{460tpool_t *tpool = (tpool_t *)arg;461462pthread_mutex_unlock(&tpool->tp_mutex);463}464465/*466* Assumes: by the time tpool_destroy() is called no one will use this467* thread pool in any way and no one will try to dispatch entries to it.468* Calling tpool_destroy() from a job in the pool will cause deadlock.469*/470void471tpool_destroy(tpool_t *tpool)472{473tpool_active_t *activep;474475ASSERT(!tpool_member(tpool));476ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));477478pthread_mutex_lock(&tpool->tp_mutex);479pthread_cleanup_push(tpool_cleanup, tpool);480481/* mark the pool as being destroyed; wakeup idle workers */482tpool->tp_flags |= TP_DESTROY;483tpool->tp_flags &= ~TP_SUSPEND;484(void) pthread_cond_broadcast(&tpool->tp_workcv);485486/* cancel all active workers */487for (activep = tpool->tp_active; activep; activep = activep->tpa_next)488(void) pthread_cancel(activep->tpa_tid);489490/* wait for all active workers to finish */491while (tpool->tp_active != NULL) {492tpool->tp_flags |= TP_WAIT;493(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);494}495496/* the last worker to terminate will wake us up */497while (tpool->tp_current != 0)498(void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);499500pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */501delete_pool(tpool);502}503504/*505* Like tpool_destroy(), but don't cancel workers or wait for them to finish.506* The last worker to terminate will delete the pool.507*/508void509tpool_abandon(tpool_t *tpool)510{511ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));512513pthread_mutex_lock(&tpool->tp_mutex);514if (tpool->tp_current == 0) {515/* no workers, just delete the pool */516pthread_mutex_unlock(&tpool->tp_mutex);517delete_pool(tpool);518} else {519/* wake up all workers, last one will delete the pool */520tpool->tp_flags |= TP_ABANDON;521tpool->tp_flags &= ~TP_SUSPEND;522(void) pthread_cond_broadcast(&tpool->tp_workcv);523pthread_mutex_unlock(&tpool->tp_mutex);524}525}526527/*528* Wait for all jobs to complete.529* Calling tpool_wait() from a job in the pool will cause deadlock.530*/531void532tpool_wait(tpool_t *tpool)533{534ASSERT(!tpool_member(tpool));535ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));536537pthread_mutex_lock(&tpool->tp_mutex);538pthread_cleanup_push(tpool_cleanup, tpool);539while (tpool->tp_head != NULL || tpool->tp_active != NULL) {540tpool->tp_flags |= TP_WAIT;541(void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);542ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));543}544pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */545}546547void548tpool_suspend(tpool_t *tpool)549{550ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));551552pthread_mutex_lock(&tpool->tp_mutex);553tpool->tp_flags |= TP_SUSPEND;554pthread_mutex_unlock(&tpool->tp_mutex);555}556557int558tpool_suspended(tpool_t *tpool)559{560int suspended;561562ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));563564pthread_mutex_lock(&tpool->tp_mutex);565suspended = (tpool->tp_flags & TP_SUSPEND) != 0;566pthread_mutex_unlock(&tpool->tp_mutex);567568return (suspended);569}570571void572tpool_resume(tpool_t *tpool)573{574int excess;575576ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));577578pthread_mutex_lock(&tpool->tp_mutex);579if (!(tpool->tp_flags & TP_SUSPEND)) {580pthread_mutex_unlock(&tpool->tp_mutex);581return;582}583tpool->tp_flags &= ~TP_SUSPEND;584(void) pthread_cond_broadcast(&tpool->tp_workcv);585excess = tpool->tp_njobs - tpool->tp_idle;586while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {587if (create_worker(tpool) != 0)588break; /* pthread_create() failed */589tpool->tp_current++;590}591pthread_mutex_unlock(&tpool->tp_mutex);592}593594int595tpool_member(tpool_t *tpool)596{597pthread_t my_tid = pthread_self();598tpool_active_t *activep;599600ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));601602pthread_mutex_lock(&tpool->tp_mutex);603for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {604if (activep->tpa_tid == my_tid) {605pthread_mutex_unlock(&tpool->tp_mutex);606return (1);607}608}609pthread_mutex_unlock(&tpool->tp_mutex);610return (0);611}612613614