Path: blob/master/thirdparty/embree/common/tasking/taskschedulerinternal.h
9912 views
// Copyright 2009-2021 Intel Corporation1// SPDX-License-Identifier: Apache-2.023#pragma once45#include "../../include/embree4/rtcore.h"6#include "../sys/platform.h"7#include "../sys/alloc.h"8#include "../sys/barrier.h"9#include "../sys/thread.h"10#include "../sys/mutex.h"11#include "../sys/condition.h"12#include "../sys/ref.h"13#include "../sys/atomic.h"14#include "../math/range.h"1516#include <list>1718namespace embree19{2021/* The tasking system exports some symbols to be used by the tutorials. Thus we22hide is also in the API namespace when requested. */23RTC_NAMESPACE_BEGIN2425struct TaskScheduler : public RefCount26{27ALIGNED_STRUCT_(64);28friend class Device;2930static const size_t TASK_STACK_SIZE = 4*1024; //!< task structure stack31static const size_t CLOSURE_STACK_SIZE = 512*1024; //!< stack for task closures3233struct Thread;3435/*! virtual interface for all tasks */36struct TaskFunction {37virtual void execute() = 0;38};394041struct TaskGroupContext {42TaskGroupContext() : cancellingException(nullptr) {}4344std::exception_ptr cancellingException;45};4647/*! builds a task interface from a closure */48template<typename Closure>49struct ClosureTaskFunction : public TaskFunction50{51Closure closure;52__forceinline ClosureTaskFunction (const Closure& closure) : closure(closure) {}53void execute() { closure(); };54};5556struct __aligned(64) Task57{58/*! states a task can be in */59enum { DONE, INITIALIZED };6061/*! switch from one state to another */62__forceinline void switch_state(int from, int to)63{64__memory_barrier();65MAYBE_UNUSED bool success = state.compare_exchange_strong(from,to);66assert(success);67}6869/*! try to switch from one state to another */70__forceinline bool try_switch_state(int from, int to) {71__memory_barrier();72return state.compare_exchange_strong(from,to);73}7475/*! increment/decrement dependency counter */76void add_dependencies(int n) {77dependencies+=n;78}7980/*! initialize all tasks to DONE state by default */81__forceinline Task()82: state(DONE) {}8384/*! construction of new task */85__forceinline Task (TaskFunction* closure, Task* parent, TaskGroupContext* context, size_t stackPtr, size_t N)86: dependencies(1), stealable(true), closure(closure), parent(parent), context(context), stackPtr(stackPtr), N(N)87{88if (parent) parent->add_dependencies(+1);89switch_state(DONE,INITIALIZED);90}9192/*! construction of stolen task, stealing thread will decrement initial dependency */93__forceinline Task (TaskFunction* closure, Task* parent, TaskGroupContext* context)94: dependencies(1), stealable(false), closure(closure), parent(parent), context(context), stackPtr(-1), N(1)95{96switch_state(DONE,INITIALIZED);97}9899/*! try to steal this task */100bool try_steal(Task& child)101{102if (!stealable) return false;103if (!try_switch_state(INITIALIZED,DONE)) return false;104new (&child) Task(closure, this, context);105return true;106}107108/*! run this task */109dll_export void run(Thread& thread);110111void run_internal(Thread& thread);112113public:114std::atomic<int> state; //!< state this task is in115std::atomic<int> dependencies; //!< dependencies to wait for116std::atomic<bool> stealable; //!< true if task can be stolen117TaskFunction* closure; //!< the closure to execute118Task* parent; //!< parent task to signal when we are finished119TaskGroupContext* context;120size_t stackPtr; //!< stack location where closure is stored121size_t N; //!< approximative size of task122};123124struct TaskQueue125{126TaskQueue ()127: left(0), right(0), stackPtr(0) {}128129__forceinline void* alloc(size_t bytes, size_t align = 64)130{131size_t ofs = bytes + ((align - stackPtr) & (align-1));132if (stackPtr + ofs > CLOSURE_STACK_SIZE)133abort(); //throw std::runtime_error("closure stack overflow");134stackPtr += ofs;135return &stack[stackPtr-bytes];136}137138template<typename Closure>139__forceinline void push_right(Thread& thread, const size_t size, const Closure& closure, TaskGroupContext* context)140{141if (right >= TASK_STACK_SIZE)142abort(); //throw std::runtime_error("task stack overflow");143144/* allocate new task on right side of stack */145size_t oldStackPtr = stackPtr;146TaskFunction* func = new (alloc(sizeof(ClosureTaskFunction<Closure>))) ClosureTaskFunction<Closure>(closure);147new (&tasks[right.load()]) Task(func,thread.task,context,oldStackPtr,size);148right++;149150/* also move left pointer */151if (left >= right-1) left = right-1;152}153154dll_export bool execute_local(Thread& thread, Task* parent);155bool execute_local_internal(Thread& thread, Task* parent);156bool steal(Thread& thread);157size_t getTaskSizeAtLeft();158159bool empty() { return right == 0; }160161public:162163/* task stack */164Task tasks[TASK_STACK_SIZE];165__aligned(64) std::atomic<size_t> left; //!< threads steal from left166__aligned(64) std::atomic<size_t> right; //!< new tasks are added to the right167168/* closure stack */169__aligned(64) char stack[CLOSURE_STACK_SIZE];170size_t stackPtr;171};172173/*! thread local structure for each thread */174struct Thread175{176ALIGNED_STRUCT_(64);177178Thread (size_t threadIndex, const Ref<TaskScheduler>& scheduler)179: threadIndex(threadIndex), task(nullptr), scheduler(scheduler) {}180181__forceinline size_t threadCount() {182return scheduler->threadCounter;183}184185size_t threadIndex; //!< ID of this thread186TaskQueue tasks; //!< local task queue187Task* task; //!< current active task188Ref<TaskScheduler> scheduler; //!< pointer to task scheduler189};190191/*! pool of worker threads */192struct ThreadPool193{194ThreadPool (bool set_affinity);195~ThreadPool ();196197/*! starts the threads */198dll_export void startThreads();199200/*! sets number of threads to use */201void setNumThreads(size_t numThreads, bool startThreads = false);202203/*! adds a task scheduler object for scheduling */204dll_export void add(const Ref<TaskScheduler>& scheduler);205206/*! remove the task scheduler object again */207dll_export void remove(const Ref<TaskScheduler>& scheduler);208209/*! returns number of threads of the thread pool */210size_t size() const { return numThreads; }211212/*! main loop for all threads */213void thread_loop(size_t threadIndex);214215private:216std::atomic<size_t> numThreads;217std::atomic<size_t> numThreadsRunning;218bool set_affinity;219std::atomic<bool> running;220std::vector<thread_t> threads;221222private:223MutexSys mutex;224ConditionSys condition;225std::list<Ref<TaskScheduler> > schedulers;226};227228TaskScheduler ();229~TaskScheduler ();230231/*! initializes the task scheduler */232static void create(size_t numThreads, bool set_affinity, bool start_threads);233234/*! destroys the task scheduler again */235static void destroy();236237/*! lets new worker threads join the tasking system */238void join();239void reset();240241/*! let a worker thread allocate a thread index */242dll_export ssize_t allocThreadIndex();243244/*! wait for some number of threads available (threadCount includes main thread) */245void wait_for_threads(size_t threadCount);246247/*! thread loop for all worker threads */248void thread_loop(size_t threadIndex);249250/*! steals a task from a different thread */251bool steal_from_other_threads(Thread& thread);252253template<typename Predicate, typename Body>254static void steal_loop(Thread& thread, const Predicate& pred, const Body& body);255256/* spawn a new task at the top of the threads task stack */257template<typename Closure>258void spawn_root(const Closure& closure, TaskGroupContext* context, size_t size = 1, bool useThreadPool = true)259{260if (useThreadPool) startThreads();261262size_t threadIndex = allocThreadIndex();263std::unique_ptr<Thread> mthread(new Thread(threadIndex,this)); // too large for stack allocation264Thread& thread = *mthread;265assert(threadLocal[threadIndex].load() == nullptr);266threadLocal[threadIndex] = &thread;267Thread* oldThread = swapThread(&thread);268thread.tasks.push_right(thread,size,closure,context);269{270Lock<MutexSys> lock(mutex);271anyTasksRunning++;272hasRootTask = true;273condition.notify_all();274}275276if (useThreadPool) addScheduler(this);277278while (thread.tasks.execute_local(thread,nullptr));279anyTasksRunning--;280if (useThreadPool) removeScheduler(this);281282threadLocal[threadIndex] = nullptr;283swapThread(oldThread);284285/* remember exception to throw */286std::exception_ptr except = nullptr;287if (context->cancellingException != nullptr) except = context->cancellingException;288289/* wait for all threads to terminate */290threadCounter--;291while (threadCounter > 0) yield();292context->cancellingException = nullptr;293294/* re-throw proper exception */295if (except != nullptr) {296std::rethrow_exception(except);297}298}299300/* spawn a new task at the top of the threads task stack */301template<typename Closure>302static __forceinline void spawn(size_t size, const Closure& closure, TaskGroupContext* context)303{304Thread* thread = TaskScheduler::thread();305if (likely(thread != nullptr)) thread->tasks.push_right(*thread,size,closure,context);306else instance()->spawn_root(closure,context,size);307}308309/* spawn a new task at the top of the threads task stack */310template<typename Closure>311static __forceinline void spawn(const Closure& closure, TaskGroupContext* taskGroupContext) {312spawn(1,closure,taskGroupContext);313}314315/* spawn a new task set */316template<typename Index, typename Closure>317static void spawn(const Index begin, const Index end, const Index blockSize, const Closure& closure, TaskGroupContext* context)318{319spawn(end-begin, [=]()320{321if (end-begin <= blockSize) {322return closure(range<Index>(begin,end));323}324const Index center = (begin+end)/2;325spawn(begin,center,blockSize,closure,context);326spawn(center,end ,blockSize,closure,context);327wait();328},context);329}330331/* work on spawned subtasks and wait until all have finished */332dll_export static void wait();333334/* returns the ID of the current thread */335dll_export static size_t threadID();336337/* returns the index (0..threadCount-1) of the current thread */338dll_export static size_t threadIndex();339340/* returns the total number of threads */341dll_export static size_t threadCount();342343private:344345/* returns the thread local task list of this worker thread */346dll_export static Thread* thread();347348/* sets the thread local task list of this worker thread */349dll_export static Thread* swapThread(Thread* thread);350351/*! returns the taskscheduler object to be used by the master thread */352dll_export static TaskScheduler* instance();353354/*! starts the threads */355dll_export static void startThreads();356357/*! adds a task scheduler object for scheduling */358dll_export static void addScheduler(const Ref<TaskScheduler>& scheduler);359360/*! remove the task scheduler object again */361dll_export static void removeScheduler(const Ref<TaskScheduler>& scheduler);362363private:364std::vector<atomic<Thread*>> threadLocal;365std::atomic<size_t> threadCounter;366std::atomic<size_t> anyTasksRunning;367std::atomic<bool> hasRootTask;368MutexSys mutex;369ConditionSys condition;370371private:372static size_t g_numThreads;373static __thread TaskScheduler* g_instance;374static __thread Thread* thread_local_thread;375static ThreadPool* threadPool;376};377378RTC_NAMESPACE_END379380#if defined(RTC_NAMESPACE)381using RTC_NAMESPACE::TaskScheduler;382#endif383}384385386