Path: blob/master/core/object/worker_thread_pool.cpp
20934 views
/**************************************************************************/1/* worker_thread_pool.cpp */2/**************************************************************************/3/* This file is part of: */4/* GODOT ENGINE */5/* https://godotengine.org */6/**************************************************************************/7/* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */8/* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */9/* */10/* Permission is hereby granted, free of charge, to any person obtaining */11/* a copy of this software and associated documentation files (the */12/* "Software"), to deal in the Software without restriction, including */13/* without limitation the rights to use, copy, modify, merge, publish, */14/* distribute, sublicense, and/or sell copies of the Software, and to */15/* permit persons to whom the Software is furnished to do so, subject to */16/* the following conditions: */17/* */18/* The above copyright notice and this permission notice shall be */19/* included in all copies or substantial portions of the Software. */20/* */21/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */22/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */23/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */24/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */25/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */26/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */27/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */28/**************************************************************************/2930#include "worker_thread_pool.h"3132#include "core/object/script_language.h"33#include "core/os/os.h"34#include "core/os/safe_binary_mutex.h"35#include "core/os/thread_safe.h"3637WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1;3839HashMap<StringName, WorkerThreadPool *> WorkerThreadPool::named_pools;4041void WorkerThreadPool::Task::free_template_userdata() {42ERR_FAIL_NULL(template_userdata);43ERR_FAIL_NULL(native_func_userdata);44BaseTemplateUserdata *btu = (BaseTemplateUserdata *)native_func_userdata;45memdelete(btu);46}4748WorkerThreadPool *WorkerThreadPool::singleton = nullptr;4950#ifdef THREADS_ENABLED51thread_local WorkerThreadPool::UnlockableLocks WorkerThreadPool::unlockable_locks[MAX_UNLOCKABLE_LOCKS];52#endif5354void WorkerThreadPool::_process_task(Task *p_task) {55#ifdef THREADS_ENABLED56int pool_thread_index = thread_ids[Thread::get_caller_id()];57ThreadData &curr_thread = threads[pool_thread_index];58Task *prev_task = nullptr; // In case this is recursively called.5960bool safe_for_nodes_backup = is_current_thread_safe_for_nodes();61CallQueue *call_queue_backup = MessageQueue::get_singleton() != MessageQueue::get_main_singleton() ? MessageQueue::get_singleton() : nullptr;6263{64// Tasks must start with these at default values. They are free to set-and-forget otherwise.65set_current_thread_safe_for_nodes(false);66MessageQueue::set_thread_singleton_override(nullptr);6768// Since the WorkerThreadPool is started before the script server,69// its pre-created threads can't have ScriptServer::thread_enter() called on them early.70// Therefore, we do it late at the first opportunity, so in case the task71// about to be run uses scripting, guarantees are held.72ScriptServer::thread_enter();7374task_mutex.lock();75p_task->pool_thread_index = pool_thread_index;76prev_task = curr_thread.current_task;77curr_thread.current_task = p_task;78curr_thread.has_pump_task = p_task->is_pump_task;79if (p_task->pending_notify_yield_over) {80curr_thread.yield_is_over = true;81}82task_mutex.unlock();83}84#endif8586#ifdef THREADS_ENABLED87bool low_priority = p_task->low_priority;88#endif8990if (p_task->group) {91// Handling a group92bool do_post = false;9394while (true) {95uint32_t work_index = p_task->group->index.postincrement();9697if (work_index >= p_task->group->max) {98break;99}100if (p_task->native_group_func) {101p_task->native_group_func(p_task->native_func_userdata, work_index);102} else if (p_task->template_userdata) {103p_task->template_userdata->callback_indexed(work_index);104} else {105p_task->callable.call(work_index);106}107108// This is the only way to ensure posting is done when all tasks are really complete.109uint32_t completed_amount = p_task->group->completed_index.increment();110111if (completed_amount == p_task->group->max) {112do_post = true;113}114}115116if (do_post && p_task->template_userdata) {117memdelete(p_task->template_userdata); // This is no longer needed at this point, so get rid of it.118}119120if (do_post) {121p_task->group->done_semaphore.post();122p_task->group->completed.set_to(true);123}124uint32_t max_users = p_task->group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.125uint32_t finished_users = p_task->group->finished.increment();126127if (finished_users == max_users) {128// Get rid of the group, because nobody else is using it.129MutexLock task_lock(task_mutex);130group_allocator.free(p_task->group);131}132133// For groups, tasks get rid of themselves.134135task_mutex.lock();136task_allocator.free(p_task);137} else {138if (p_task->native_func) {139p_task->native_func(p_task->native_func_userdata);140} else if (p_task->template_userdata) {141p_task->template_userdata->callback();142memdelete(p_task->template_userdata);143} else {144p_task->callable.call();145}146147task_mutex.lock();148p_task->completed = true;149p_task->pool_thread_index = -1;150if (p_task->waiting_user) {151p_task->done_semaphore.post(p_task->waiting_user);152}153// Let awaiters know.154for (uint32_t i = 0; i < threads.size(); i++) {155if (threads[i].awaited_task == p_task) {156threads[i].cond_var.notify_one();157threads[i].signaled = true;158}159}160}161162#ifdef THREADS_ENABLED163{164curr_thread.current_task = prev_task;165if (low_priority) {166low_priority_threads_used--;167168if (_try_promote_low_priority_task()) {169if (prev_task) { // Otherwise, this thread will catch it.170_notify_threads(&curr_thread, 1, 0);171}172}173}174175task_mutex.unlock();176}177178set_current_thread_safe_for_nodes(safe_for_nodes_backup);179MessageQueue::set_thread_singleton_override(call_queue_backup);180#endif181}182183void WorkerThreadPool::_thread_function(void *p_user) {184ThreadData *thread_data = (ThreadData *)p_user;185Thread::set_name(vformat("WorkerThread %d", thread_data->index));186187while (true) {188Task *task_to_process = nullptr;189{190// Create the lock outside the inner loop so it isn't needlessly unlocked and relocked191// when no task was found to process, and the loop is re-entered.192MutexLock lock(thread_data->pool->task_mutex);193194while (true) {195bool exit = thread_data->pool->_handle_runlevel(thread_data, lock);196if (unlikely(exit)) {197return;198}199200thread_data->signaled = false;201202if (!thread_data->pool->task_queue.first()) {203// There wasn't a task available yet.204// Let's wait for the next notification, then recheck.205thread_data->cond_var.wait(lock);206continue;207}208209// Got a task to process! Remove it from the queue, then break into the task handling section.210task_to_process = thread_data->pool->task_queue.first()->self();211thread_data->pool->task_queue.remove(thread_data->pool->task_queue.first());212break;213}214}215216DEV_ASSERT(task_to_process);217thread_data->pool->_process_task(task_to_process);218}219}220221void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock, bool p_pump_task) {222// Fall back to processing on the calling thread if there are no worker threads.223// Separated into its own variable to make it easier to extend this logic224// in custom builds.225226// Avoid calling pump tasks or low priority tasks from the calling thread.227bool process_on_calling_thread = threads.is_empty() && !p_pump_task;228if (process_on_calling_thread) {229p_lock.temp_unlock();230for (uint32_t i = 0; i < p_count; i++) {231_process_task(p_tasks[i]);232}233p_lock.temp_relock();234return;235}236237while (runlevel == RUNLEVEL_EXIT_LANGUAGES) {238control_cond_var.wait(p_lock);239}240241uint32_t to_process = 0;242uint32_t to_promote = 0;243244ThreadData *caller_pool_thread = thread_ids.has(Thread::get_caller_id()) ? &threads[thread_ids[Thread::get_caller_id()]] : nullptr;245246for (uint32_t i = 0; i < p_count; i++) {247p_tasks[i]->low_priority = !p_high_priority;248if (p_high_priority || low_priority_threads_used < max_low_priority_threads) {249task_queue.add_last(&p_tasks[i]->task_elem);250if (!p_high_priority) {251low_priority_threads_used++;252}253to_process++;254} else {255// Too many threads using low priority, must go to queue.256low_priority_task_queue.add_last(&p_tasks[i]->task_elem);257to_promote++;258}259}260261_notify_threads(caller_pool_thread, to_process, to_promote);262}263264void WorkerThreadPool::_notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count) {265uint32_t to_process = p_process_count;266uint32_t to_promote = p_promote_count;267268// This is where which threads are awaken is decided according to the workload.269// Threads that will anyway have a chance to check the situation and process/promote tasks270// are excluded from being notified. Others will be tried anyway to try to distribute load.271// The current thread, if is a pool thread, is also excluded depending on the promoting/processing272// needs because it will anyway loop again. However, it will contribute to decreasing the count,273// which helps reducing sync traffic.274275uint32_t thread_count = threads.size();276277// First round:278// 1. For processing: notify threads that are not running tasks, to keep the stacks as shallow as possible.279// 2. For promoting: since it's exclusive with processing, we fin threads able to promote low-prio tasks now.280for (uint32_t i = 0;281i < thread_count && (to_process || to_promote);282i++, notify_index = (notify_index + 1) % thread_count) {283ThreadData &th = threads[notify_index];284285if (th.signaled) {286continue;287}288if (th.current_task) {289// Good thread for promoting low-prio?290if (to_promote && th.awaited_task && th.current_task->low_priority) {291if (likely(&th != p_current_thread_data)) {292th.cond_var.notify_one();293}294th.signaled = true;295to_promote--;296}297} else {298if (to_process) {299if (likely(&th != p_current_thread_data)) {300th.cond_var.notify_one();301}302th.signaled = true;303to_process--;304}305}306}307308// Second round:309// For processing: if the first round wasn't enough, let's try now with threads processing tasks but currently awaiting.310for (uint32_t i = 0;311i < thread_count && to_process;312i++, notify_index = (notify_index + 1) % thread_count) {313ThreadData &th = threads[notify_index];314315if (th.signaled) {316continue;317}318if (th.awaited_task) {319if (likely(&th != p_current_thread_data)) {320th.cond_var.notify_one();321}322th.signaled = true;323to_process--;324}325}326}327328bool WorkerThreadPool::_try_promote_low_priority_task() {329if (low_priority_task_queue.first()) {330Task *low_prio_task = low_priority_task_queue.first()->self();331low_priority_task_queue.remove(low_priority_task_queue.first());332task_queue.add_last(&low_prio_task->task_elem);333low_priority_threads_used++;334return true;335} else {336return false;337}338}339340WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) {341return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description);342}343344WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description, bool p_pump_task) {345MutexLock<BinaryMutex> lock(task_mutex);346347// Get a free task348Task *task = task_allocator.alloc();349TaskID id = last_task++;350task->self = id;351task->callable = p_callable;352task->native_func = p_func;353task->native_func_userdata = p_userdata;354task->description = p_description;355task->template_userdata = p_template_userdata;356task->is_pump_task = p_pump_task;357tasks.insert(id, task);358359#ifdef THREADS_ENABLED360if (p_pump_task) {361pump_task_count++;362int thread_count = get_thread_count();363if (pump_task_count >= thread_count) {364print_verbose(vformat("A greater number of dedicated threads were requested (%d) than threads available (%d). Please increase the number of available worker task threads. Recovering this session by spawning more worker task threads.", pump_task_count + 1, thread_count)); // +1 because we want to keep a Thread without any pump tasks free.365366// Re-sizing implies relocation, which is not supported for this array.367CRASH_COND_MSG(thread_count + 1 > (int)threads.get_capacity(), "Reserve trick for worker thread pool failed. Crashing.");368threads.resize_initialized(thread_count + 1);369threads[thread_count].index = thread_count;370threads[thread_count].pool = this;371threads[thread_count].thread.start(&WorkerThreadPool::_thread_function, &threads[thread_count]);372thread_ids.insert(threads[thread_count].thread.get_id(), thread_count);373}374}375#endif376377_post_tasks(&task, 1, p_high_priority, lock, p_pump_task);378379return id;380}381382WorkerThreadPool::TaskID WorkerThreadPool::add_task(const Callable &p_action, bool p_high_priority, const String &p_description, bool p_pump_task) {383return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description, p_pump_task);384}385386WorkerThreadPool::TaskID WorkerThreadPool::add_task_bind(const Callable &p_action, bool p_high_priority, const String &p_description) {387return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description, false);388}389390bool WorkerThreadPool::is_task_completed(TaskID p_task_id) const {391MutexLock task_lock(task_mutex);392const Task *const *taskp = tasks.getptr(p_task_id);393if (!taskp) {394ERR_FAIL_V_MSG(false, "Invalid Task ID"); // Invalid task395}396397return (*taskp)->completed;398}399400Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {401task_mutex.lock();402Task **taskp = tasks.getptr(p_task_id);403if (!taskp) {404task_mutex.unlock();405ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Invalid Task ID"); // Invalid task406}407Task *task = *taskp;408409if (task->completed) {410if (task->waiting_pool == 0 && task->waiting_user == 0) {411tasks.erase(p_task_id);412task_allocator.free(task);413}414task_mutex.unlock();415return OK;416}417418ThreadData *caller_pool_thread = thread_ids.has(Thread::get_caller_id()) ? &threads[thread_ids[Thread::get_caller_id()]] : nullptr;419if (caller_pool_thread && p_task_id <= caller_pool_thread->current_task->self) {420// Deadlock prevention:421// When a pool thread wants to wait for an older task, the following situations can happen:422// 1. Awaited task is deep in the stack of the awaiter.423// 2. A group of awaiter threads end up depending on some tasks buried in the stack424// of their worker threads in such a way that progress can't be made.425// Both would entail a deadlock. Some may be handled here in the WorkerThreadPool426// with some extra logic and bookkeeping. However, there would still be unavoidable427// cases of deadlock because of the way waiting threads process outstanding tasks.428// Taking into account there's no feasible solution for every possible case429// with the current design, we just simply reject attempts to await on older tasks,430// with a specific error code that signals the situation so the caller can handle it.431task_mutex.unlock();432return ERR_BUSY;433}434435if (caller_pool_thread) {436task->waiting_pool++;437} else {438task->waiting_user++;439}440441if (caller_pool_thread) {442task_mutex.unlock();443_wait_collaboratively(caller_pool_thread, task);444task_mutex.lock();445task->waiting_pool--;446if (task->waiting_pool == 0 && task->waiting_user == 0) {447tasks.erase(p_task_id);448task_allocator.free(task);449}450} else {451task_mutex.unlock();452task->done_semaphore.wait();453task_mutex.lock();454task->waiting_user--;455if (task->waiting_pool == 0 && task->waiting_user == 0) {456tasks.erase(p_task_id);457task_allocator.free(task);458}459}460461task_mutex.unlock();462return OK;463}464465void WorkerThreadPool::_lock_unlockable_mutexes() {466#ifdef THREADS_ENABLED467for (uint32_t i = 0; i < MAX_UNLOCKABLE_LOCKS; i++) {468if (unlockable_locks[i].ulock) {469unlockable_locks[i].ulock->lock();470}471}472#endif473}474475void WorkerThreadPool::_unlock_unlockable_mutexes() {476#ifdef THREADS_ENABLED477for (uint32_t i = 0; i < MAX_UNLOCKABLE_LOCKS; i++) {478if (unlockable_locks[i].ulock) {479unlockable_locks[i].ulock->unlock();480}481}482#endif483}484485void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {486// Keep processing tasks until the condition to stop waiting is met.487488while (true) {489Task *task_to_process = nullptr;490bool relock_unlockables = false;491{492MutexLock lock(task_mutex);493494bool was_signaled = p_caller_pool_thread->signaled;495p_caller_pool_thread->signaled = false;496497bool exit = _handle_runlevel(p_caller_pool_thread, lock);498if (unlikely(exit)) {499break;500}501502bool wait_is_over = false;503if (unlikely(p_task == ThreadData::YIELDING)) {504if (p_caller_pool_thread->yield_is_over) {505p_caller_pool_thread->yield_is_over = false;506wait_is_over = true;507}508} else {509if (p_task->completed) {510wait_is_over = true;511}512}513514if (wait_is_over) {515if (was_signaled) {516// This thread was awaken for some additional reason, but it's about to exit.517// Let's find out what may be pending and forward the requests.518uint32_t to_process = task_queue.first() ? 1 : 0;519uint32_t to_promote = p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0;520if (to_process || to_promote) {521// This thread must be left alone since it won't loop again.522p_caller_pool_thread->signaled = true;523_notify_threads(p_caller_pool_thread, to_process, to_promote);524}525}526527break;528}529530if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {531if (_try_promote_low_priority_task()) {532_notify_threads(p_caller_pool_thread, 1, 0);533}534}535536if (p_caller_pool_thread->pool->task_queue.first()) {537task_to_process = task_queue.first()->self();538if ((p_task == ThreadData::YIELDING || p_caller_pool_thread->has_pump_task == true) && task_to_process->is_pump_task) {539task_to_process = nullptr;540_notify_threads(p_caller_pool_thread, 1, 0);541} else {542task_queue.remove(task_queue.first());543}544}545546if (!task_to_process) {547p_caller_pool_thread->awaited_task = p_task;548549if (this == singleton) {550_unlock_unlockable_mutexes();551}552relock_unlockables = true;553554p_caller_pool_thread->cond_var.wait(lock);555556p_caller_pool_thread->awaited_task = nullptr;557}558}559560if (relock_unlockables && this == singleton) {561_lock_unlockable_mutexes();562}563564if (task_to_process) {565_process_task(task_to_process);566}567}568}569570void WorkerThreadPool::_switch_runlevel(Runlevel p_runlevel) {571DEV_ASSERT(p_runlevel > runlevel);572runlevel = p_runlevel;573memset(&runlevel_data, 0, sizeof(runlevel_data));574for (uint32_t i = 0; i < threads.size(); i++) {575threads[i].cond_var.notify_one();576threads[i].signaled = true;577}578control_cond_var.notify_all();579}580581// Returns whether threads have to exit. This may perform the check about handling needed.582bool WorkerThreadPool::_handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock) {583bool exit = false;584switch (runlevel) {585case RUNLEVEL_NORMAL: {586} break;587case RUNLEVEL_PRE_EXIT_LANGUAGES: {588if (!p_thread_data->pre_exited_languages) {589if (!task_queue.first() && !low_priority_task_queue.first()) {590p_thread_data->pre_exited_languages = true;591runlevel_data.pre_exit_languages.num_idle_threads++;592control_cond_var.notify_all();593}594}595} break;596case RUNLEVEL_EXIT_LANGUAGES: {597if (!p_thread_data->exited_languages) {598p_lock.temp_unlock();599ScriptServer::thread_exit();600p_lock.temp_relock();601p_thread_data->exited_languages = true;602runlevel_data.exit_languages.num_exited_threads++;603control_cond_var.notify_all();604}605} break;606case RUNLEVEL_EXIT: {607exit = true;608} break;609}610return exit;611}612613void WorkerThreadPool::yield() {614int th_index = get_thread_index();615ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread.");616_wait_collaboratively(&threads[th_index], ThreadData::YIELDING);617618task_mutex.lock();619if (runlevel < RUNLEVEL_EXIT_LANGUAGES) {620// If this long-lived task started before the scripting server was initialized,621// now is a good time to have scripting languages ready for the current thread.622// Otherwise, such a piece of setup won't happen unless another task has been623// run during the collaborative wait.624task_mutex.unlock();625ScriptServer::thread_enter();626} else {627task_mutex.unlock();628}629}630631void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {632MutexLock task_lock(task_mutex);633Task **taskp = tasks.getptr(p_task_id);634if (!taskp) {635ERR_FAIL_MSG("Invalid Task ID.");636}637Task *task = *taskp;638if (task->pool_thread_index == -1) { // Completed or not started yet.639if (!task->completed) {640// This avoids a race condition where a task is created and yield-over called before it's processed.641task->pending_notify_yield_over = true;642}643return;644}645646ThreadData &td = threads[task->pool_thread_index];647td.yield_is_over = true;648td.signaled = true;649td.cond_var.notify_one();650}651652WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {653ERR_FAIL_COND_V(p_elements < 0, INVALID_TASK_ID);654if (p_tasks < 0) {655p_tasks = MAX(1u, threads.size());656}657658MutexLock<BinaryMutex> lock(task_mutex);659660Group *group = group_allocator.alloc();661GroupID id = last_task++;662group->max = p_elements;663group->self = id;664665Task **tasks_posted = nullptr;666if (p_elements == 0) {667// Should really not call it with zero Elements, but at least it should work.668group->completed.set_to(true);669group->done_semaphore.post();670group->tasks_used = 0;671p_tasks = 0;672if (p_template_userdata) {673memdelete(p_template_userdata);674}675676} else {677group->tasks_used = p_tasks;678tasks_posted = (Task **)alloca(sizeof(Task *) * p_tasks);679for (int i = 0; i < p_tasks; i++) {680Task *task = task_allocator.alloc();681task->native_group_func = p_func;682task->native_func_userdata = p_userdata;683task->description = p_description;684task->group = group;685task->callable = p_callable;686task->template_userdata = p_template_userdata;687tasks_posted[i] = task;688// No task ID is used.689}690}691692groups[id] = group;693694_post_tasks(tasks_posted, p_tasks, p_high_priority, lock, false);695696return id;697}698699WorkerThreadPool::GroupID WorkerThreadPool::add_native_group_task(void (*p_func)(void *, uint32_t), void *p_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {700return _add_group_task(Callable(), p_func, p_userdata, nullptr, p_elements, p_tasks, p_high_priority, p_description);701}702703WorkerThreadPool::GroupID WorkerThreadPool::add_group_task(const Callable &p_action, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {704return _add_group_task(p_action, nullptr, nullptr, nullptr, p_elements, p_tasks, p_high_priority, p_description);705}706707uint32_t WorkerThreadPool::get_group_processed_element_count(GroupID p_group) const {708MutexLock task_lock(task_mutex);709const Group *const *groupp = groups.getptr(p_group);710if (!groupp) {711ERR_FAIL_V_MSG(0, "Invalid Group ID");712}713return (*groupp)->completed_index.get();714}715bool WorkerThreadPool::is_group_task_completed(GroupID p_group) const {716MutexLock task_lock(task_mutex);717const Group *const *groupp = groups.getptr(p_group);718if (!groupp) {719ERR_FAIL_V_MSG(false, "Invalid Group ID");720}721return (*groupp)->completed.is_set();722}723724void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) {725#ifdef THREADS_ENABLED726task_mutex.lock();727Group **groupp = groups.getptr(p_group);728task_mutex.unlock();729if (!groupp) {730ERR_FAIL_MSG("Invalid Group ID.");731}732733{734Group *group = *groupp;735736if (this == singleton) {737_unlock_unlockable_mutexes();738}739group->done_semaphore.wait();740if (this == singleton) {741_lock_unlockable_mutexes();742}743744uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.745uint32_t finished_users = group->finished.increment(); // fetch happens before inc, so increment later.746747if (finished_users == max_users) {748// All tasks using this group are gone (finished before the group), so clear the group too.749MutexLock task_lock(task_mutex);750group_allocator.free(group);751}752}753754MutexLock task_lock(task_mutex); // This mutex is needed when Physics 2D and/or 3D is selected to run on a separate thread.755groups.erase(p_group);756#endif757}758759int WorkerThreadPool::get_thread_index() const {760Thread::ID tid = Thread::get_caller_id();761return thread_ids.has(tid) ? thread_ids[tid] : -1;762}763764WorkerThreadPool::TaskID WorkerThreadPool::get_caller_task_id() const {765int th_index = get_thread_index();766if (th_index != -1 && threads[th_index].current_task) {767return threads[th_index].current_task->self;768} else {769return INVALID_TASK_ID;770}771}772773WorkerThreadPool::GroupID WorkerThreadPool::get_caller_group_id() const {774int th_index = get_thread_index();775if (th_index != -1 && threads[th_index].current_task && threads[th_index].current_task->group) {776return threads[th_index].current_task->group->self;777} else {778return INVALID_TASK_ID;779}780}781782#ifdef THREADS_ENABLED783uint32_t WorkerThreadPool::_thread_enter_unlock_allowance_zone(THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &p_ulock) {784for (uint32_t i = 0; i < MAX_UNLOCKABLE_LOCKS; i++) {785DEV_ASSERT((bool)unlockable_locks[i].ulock == (bool)unlockable_locks[i].rc);786if (unlockable_locks[i].ulock == &p_ulock) {787// Already registered in the current thread.788unlockable_locks[i].rc++;789return i;790} else if (!unlockable_locks[i].ulock) {791unlockable_locks[i].ulock = &p_ulock;792unlockable_locks[i].rc = 1;793return i;794}795}796ERR_FAIL_V_MSG(UINT32_MAX, "No more unlockable lock slots available. Engine bug.");797}798799void WorkerThreadPool::thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {800DEV_ASSERT(unlockable_locks[p_zone_id].ulock && unlockable_locks[p_zone_id].rc);801unlockable_locks[p_zone_id].rc--;802if (unlockable_locks[p_zone_id].rc == 0) {803unlockable_locks[p_zone_id].ulock = nullptr;804}805}806#endif807808void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) {809ERR_FAIL_COND(threads.size() > 0);810811runlevel = RUNLEVEL_NORMAL;812813if (p_thread_count < 0) {814p_thread_count = OS::get_singleton()->get_default_thread_pool_size();815}816817max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1);818819print_verbose(vformat("WorkerThreadPool: %d threads, %d max low-priority.", p_thread_count, max_low_priority_threads));820821#ifdef THREADS_ENABLED822// Reserve 5 threads in case we need separate threads for 1) 2D physics 2) 3D physics 3) rendering 4) GPU texture compression, 5) all other tasks.823// We cannot safely increase the Vector size at runtime, so reserve enough up front, but only launch those needed.824threads.reserve(5);825#endif826threads.resize(p_thread_count);827828for (uint32_t i = 0; i < threads.size(); i++) {829threads[i].index = i;830threads[i].pool = this;831threads[i].thread.start(&WorkerThreadPool::_thread_function, &threads[i]);832thread_ids.insert(threads[i].thread.get_id(), i);833}834}835836void WorkerThreadPool::exit_languages_threads() {837if (threads.is_empty()) {838return;839}840841MutexLock lock(task_mutex);842843// Wait until all threads are idle.844_switch_runlevel(RUNLEVEL_PRE_EXIT_LANGUAGES);845while (runlevel_data.pre_exit_languages.num_idle_threads != threads.size()) {846control_cond_var.wait(lock);847}848849// Wait until all threads have detached from scripting languages.850_switch_runlevel(RUNLEVEL_EXIT_LANGUAGES);851while (runlevel_data.exit_languages.num_exited_threads != threads.size()) {852control_cond_var.wait(lock);853}854}855856void WorkerThreadPool::finish() {857if (threads.is_empty()) {858return;859}860861{862MutexLock lock(task_mutex);863SelfList<Task> *E = low_priority_task_queue.first();864while (E) {865print_error("Task waiting was never re-claimed: " + E->self()->description);866E = E->next();867}868869_switch_runlevel(RUNLEVEL_EXIT);870}871872for (ThreadData &data : threads) {873data.thread.wait_to_finish();874}875876{877MutexLock lock(task_mutex);878for (KeyValue<TaskID, Task *> &E : tasks) {879task_allocator.free(E.value);880}881}882883threads.clear();884}885886void WorkerThreadPool::_bind_methods() {887ClassDB::bind_method(D_METHOD("add_task", "action", "high_priority", "description"), &WorkerThreadPool::add_task_bind, DEFVAL(false), DEFVAL(String()));888ClassDB::bind_method(D_METHOD("is_task_completed", "task_id"), &WorkerThreadPool::is_task_completed);889ClassDB::bind_method(D_METHOD("wait_for_task_completion", "task_id"), &WorkerThreadPool::wait_for_task_completion);890ClassDB::bind_method(D_METHOD("get_caller_task_id"), &WorkerThreadPool::get_caller_task_id);891892ClassDB::bind_method(D_METHOD("add_group_task", "action", "elements", "tasks_needed", "high_priority", "description"), &WorkerThreadPool::add_group_task, DEFVAL(-1), DEFVAL(false), DEFVAL(String()));893ClassDB::bind_method(D_METHOD("is_group_task_completed", "group_id"), &WorkerThreadPool::is_group_task_completed);894ClassDB::bind_method(D_METHOD("get_group_processed_element_count", "group_id"), &WorkerThreadPool::get_group_processed_element_count);895ClassDB::bind_method(D_METHOD("wait_for_group_task_completion", "group_id"), &WorkerThreadPool::wait_for_group_task_completion);896ClassDB::bind_method(D_METHOD("get_caller_group_id"), &WorkerThreadPool::get_caller_group_id);897}898899WorkerThreadPool *WorkerThreadPool::get_named_pool(const StringName &p_name) {900WorkerThreadPool **pool_ptr = named_pools.getptr(p_name);901if (pool_ptr) {902return *pool_ptr;903} else {904WorkerThreadPool *pool = memnew(WorkerThreadPool(false));905pool->init();906named_pools[p_name] = pool;907return pool;908}909}910911WorkerThreadPool::WorkerThreadPool(bool p_singleton) {912if (p_singleton) {913singleton = this;914}915}916917WorkerThreadPool::~WorkerThreadPool() {918finish();919920if (this == singleton) {921singleton = nullptr;922for (KeyValue<StringName, WorkerThreadPool *> &E : named_pools) {923E.value->finish();924memdelete(E.value);925}926named_pools.clear();927}928}929930931