Path: blob/master/thirdparty/embree/common/tasking/taskschedulerinternal.cpp
9912 views
// Copyright 2009-2021 Intel Corporation1// SPDX-License-Identifier: Apache-2.023#include "taskschedulerinternal.h"4#include "../math/emath.h"5#include "../sys/sysinfo.h"6#include <algorithm>78namespace embree9{10RTC_NAMESPACE_BEGIN1112static MutexSys g_mutex;13size_t TaskScheduler::g_numThreads = 0;14__thread TaskScheduler* TaskScheduler::g_instance = nullptr;15std::vector<Ref<TaskScheduler>> g_instance_vector;16__thread TaskScheduler::Thread* TaskScheduler::thread_local_thread = nullptr;17TaskScheduler::ThreadPool* TaskScheduler::threadPool = nullptr;1819template<typename Predicate, typename Body>20__forceinline void TaskScheduler::steal_loop(Thread& thread, const Predicate& pred, const Body& body)21{22while (true)23{24/*! some rounds that yield */25for (size_t i=0; i<32; i++)26{27/*! some spinning rounds */28const size_t threadCount = thread.threadCount();29for (size_t j=0; j<1024; j+=threadCount)30{31if (!pred()) return;32if (thread.scheduler->steal_from_other_threads(thread)) {33i=j=0;34body();35}36}37yield();38}39}40}4142/*! run this task */43void TaskScheduler::Task::run_internal (Thread& thread) // FIXME: avoid as many dll_exports as possible44{45/* try to run if not already stolen */46if (try_switch_state(INITIALIZED,DONE))47{48Task* prevTask = thread.task;49thread.task = this;50//try {51// if (context->cancellingException == nullptr)52closure->execute();53//} catch (...) {54// if (context->cancellingException == nullptr)55// context->cancellingException = std::current_exception();56//}57thread.task = prevTask;58add_dependencies(-1);59}6061/* steal until all dependencies have completed */62steal_loop(thread,63[&] () { return dependencies>0; },64[&] () { while (thread.tasks.execute_local_internal(thread,this)); });6566/* now signal our parent task that we are finished */67if (parent)68parent->add_dependencies(-1);69}7071/*! run this task */72dll_export void TaskScheduler::Task::run (Thread& thread) {73run_internal(thread);74}7576bool TaskScheduler::TaskQueue::execute_local_internal(Thread& thread, Task* parent)77{78/* stop if we run out of local tasks or reach the waiting task */79if (right == 0 || &tasks[right-1] == parent)80return false;8182/* execute task */83size_t oldRight = right;84tasks[right-1].run_internal(thread);85if (right != oldRight) {86THROW_RUNTIME_ERROR("you have to wait for spawned subtasks");87}8889/* pop task and closure from stack */90right--;91if (tasks[right].stackPtr != size_t(-1))92stackPtr = tasks[right].stackPtr;9394/* also move left pointer */95if (left >= right) left.store(right.load());9697return right != 0;98}99100dll_export bool TaskScheduler::TaskQueue::execute_local(Thread& thread, Task* parent) {101return execute_local_internal(thread,parent);102}103104bool TaskScheduler::TaskQueue::steal(Thread& thread)105{106size_t l = left;107size_t r = right;108if (l < r)109{110l = left++;111if (l >= r)112return false;113}114else115return false;116117if (!tasks[l].try_steal(thread.tasks.tasks[thread.tasks.right]))118return false;119120thread.tasks.right++;121return true;122}123124/* we steal from the left */125size_t TaskScheduler::TaskQueue::getTaskSizeAtLeft()126{127if (left >= right) return 0;128return tasks[left].N;129}130131void threadPoolFunction(std::pair<TaskScheduler::ThreadPool*,size_t>* pair)132{133TaskScheduler::ThreadPool* pool = pair->first;134size_t threadIndex = pair->second;135delete pair;136pool->thread_loop(threadIndex);137}138139TaskScheduler::ThreadPool::ThreadPool(bool set_affinity)140: numThreads(0), numThreadsRunning(0), set_affinity(set_affinity), running(false) {}141142dll_export void TaskScheduler::ThreadPool::startThreads()143{144if (running) return;145setNumThreads(numThreads,true);146}147148void TaskScheduler::ThreadPool::setNumThreads(size_t newNumThreads, bool startThreads)149{150Lock<MutexSys> lock(g_mutex);151assert(newNumThreads);152if (newNumThreads == std::numeric_limits<size_t>::max())153newNumThreads = (size_t) getNumberOfLogicalThreads();154155numThreads = newNumThreads;156if (!startThreads && !running) return;157running = true;158size_t numThreadsActive = numThreadsRunning;159160mutex.lock();161numThreadsRunning = newNumThreads;162mutex.unlock();163condition.notify_all();164165/* start new threads */166for (size_t t=numThreadsActive; t<numThreads; t++)167{168if (t == 0) continue;169auto pair = new std::pair<TaskScheduler::ThreadPool*,size_t>(this,t);170threads.push_back(createThread((thread_func)threadPoolFunction,pair,4*1024*1024,set_affinity ? t : -1));171}172173/* stop some threads if we reduce the number of threads */174for (ssize_t t=numThreadsActive-1; t>=ssize_t(numThreadsRunning); t--) {175if (t == 0) continue;176embree::join(threads.back());177threads.pop_back();178}179}180181TaskScheduler::ThreadPool::~ThreadPool()182{183/* leave all taskschedulers */184mutex.lock();185numThreadsRunning = 0;186mutex.unlock();187condition.notify_all();188189/* wait for threads to terminate */190for (size_t i=0; i<threads.size(); i++)191embree::join(threads[i]);192}193194dll_export void TaskScheduler::ThreadPool::add(const Ref<TaskScheduler>& scheduler)195{196mutex.lock();197schedulers.push_back(scheduler);198mutex.unlock();199condition.notify_all();200}201202dll_export void TaskScheduler::ThreadPool::remove(const Ref<TaskScheduler>& scheduler)203{204Lock<MutexSys> lock(mutex);205for (std::list<Ref<TaskScheduler> >::iterator it = schedulers.begin(); it != schedulers.end(); it++) {206if (scheduler == *it) {207schedulers.erase(it);208return;209}210}211}212213void TaskScheduler::ThreadPool::thread_loop(size_t globalThreadIndex)214{215while (globalThreadIndex < numThreadsRunning)216{217Ref<TaskScheduler> scheduler = NULL;218ssize_t threadIndex = -1;219{220Lock<MutexSys> lock(mutex);221condition.wait(mutex, [&] () { return globalThreadIndex >= numThreadsRunning || !schedulers.empty(); });222if (globalThreadIndex >= numThreadsRunning) break;223scheduler = schedulers.front();224threadIndex = scheduler->allocThreadIndex();225}226scheduler->thread_loop(threadIndex);227}228}229230TaskScheduler::TaskScheduler()231: threadCounter(0), anyTasksRunning(0), hasRootTask(false)232{233assert(threadPool);234threadLocal.resize(2 * TaskScheduler::threadCount()); // FIXME: this has to be 2x as in the compatibility join mode with rtcCommitScene the worker threads also join. When disallowing rtcCommitScene to join a build we can remove the 2x.235for (size_t i=0; i<threadLocal.size(); i++)236threadLocal[i].store(nullptr);237}238239TaskScheduler::~TaskScheduler()240{241assert(threadCounter == 0);242}243244dll_export size_t TaskScheduler::threadID()245{246Thread* thread = TaskScheduler::thread();247if (thread) return thread->threadIndex;248else return 0;249}250251dll_export size_t TaskScheduler::threadIndex()252{253Thread* thread = TaskScheduler::thread();254if (thread) return thread->threadIndex;255else return 0;256}257258dll_export size_t TaskScheduler::threadCount() {259return threadPool->size();260}261262dll_export TaskScheduler* TaskScheduler::instance()263{264if (g_instance == NULL) {265Lock<MutexSys> lock(g_mutex);266g_instance = new TaskScheduler;267g_instance_vector.push_back(g_instance);268}269return g_instance;270}271272void TaskScheduler::create(size_t numThreads, bool set_affinity, bool start_threads)273{274if (!threadPool) threadPool = new TaskScheduler::ThreadPool(set_affinity);275threadPool->setNumThreads(numThreads,start_threads);276}277278void TaskScheduler::destroy() {279delete threadPool; threadPool = nullptr;280}281282dll_export ssize_t TaskScheduler::allocThreadIndex()283{284size_t threadIndex = threadCounter++;285assert(threadIndex < threadLocal.size());286return threadIndex;287}288289void TaskScheduler::join()290{291mutex.lock();292size_t threadIndex = allocThreadIndex();293condition.wait(mutex, [&] () { return hasRootTask.load(); });294mutex.unlock();295thread_loop(threadIndex);296}297298void TaskScheduler::reset() {299hasRootTask = false;300}301302void TaskScheduler::wait_for_threads(size_t threadCount)303{304while (threadCounter < threadCount-1)305pause_cpu();306}307308dll_export TaskScheduler::Thread* TaskScheduler::thread() {309return thread_local_thread;310}311312dll_export TaskScheduler::Thread* TaskScheduler::swapThread(Thread* thread)313{314Thread* old = thread_local_thread;315thread_local_thread = thread;316return old;317}318319dll_export void TaskScheduler::wait()320{321Thread* thread = TaskScheduler::thread();322if (thread == nullptr)323return;324while (thread->tasks.execute_local_internal(*thread,thread->task)) {};325}326327void TaskScheduler::thread_loop(size_t threadIndex)328{329/* allocate thread structure */330std::unique_ptr<Thread> mthread(new Thread(threadIndex,this)); // too large for stack allocation331Thread& thread = *mthread;332threadLocal[threadIndex].store(&thread);333Thread* oldThread = swapThread(&thread);334335/* main thread loop */336while (anyTasksRunning)337{338steal_loop(thread,339[&] () { return anyTasksRunning > 0; },340[&] () {341anyTasksRunning++;342while (thread.tasks.execute_local_internal(thread,nullptr));343anyTasksRunning--;344});345}346threadLocal[threadIndex].store(nullptr);347swapThread(oldThread);348349/* wait for all threads to terminate */350threadCounter--;351#if defined(__WIN32__)352size_t loopIndex = 1;353#endif354#define LOOP_YIELD_THRESHOLD (4096)355while (threadCounter > 0) {356#if defined(__WIN32__)357if ((loopIndex % LOOP_YIELD_THRESHOLD) == 0)358yield();359else360_mm_pause();361loopIndex++;362#else363yield();364#endif365}366}367368bool TaskScheduler::steal_from_other_threads(Thread& thread)369{370const size_t threadIndex = thread.threadIndex;371const size_t threadCount = this->threadCounter;372373for (size_t i=1; i<threadCount; i++)374{375pause_cpu(32);376size_t otherThreadIndex = threadIndex+i;377if (otherThreadIndex >= threadCount) otherThreadIndex -= threadCount;378379Thread* othread = threadLocal[otherThreadIndex].load();380if (!othread)381continue;382383if (othread->tasks.steal(thread))384return true;385}386387return false;388}389390dll_export void TaskScheduler::startThreads() {391threadPool->startThreads();392}393394dll_export void TaskScheduler::addScheduler(const Ref<TaskScheduler>& scheduler) {395threadPool->add(scheduler);396}397398dll_export void TaskScheduler::removeScheduler(const Ref<TaskScheduler>& scheduler) {399threadPool->remove(scheduler);400}401402RTC_NAMESPACE_END403}404405406