Path: blob/master/thirdparty/embree/common/tasking/taskschedulerinternal.h
21661 views
// Copyright 2009-2021 Intel Corporation1// SPDX-License-Identifier: Apache-2.023#pragma once45#include "../../include/embree4/rtcore.h"6#include "../sys/platform.h"7#include "../sys/alloc.h"8#include "../sys/barrier.h"9#include "../sys/thread.h"10#include "../sys/mutex.h"11#include "../sys/condition.h"12#include "../sys/ref.h"13#include "../sys/atomic.h"14#include "../math/range.h"1516#include <exception>17#include <list>1819namespace embree20{2122/* The tasking system exports some symbols to be used by the tutorials. Thus we23hide is also in the API namespace when requested. */24RTC_NAMESPACE_BEGIN2526struct TaskScheduler : public RefCount27{28ALIGNED_STRUCT_(64);29friend class Device;3031static const size_t TASK_STACK_SIZE = 4*1024; //!< task structure stack32static const size_t CLOSURE_STACK_SIZE = 512*1024; //!< stack for task closures3334struct Thread;3536/*! virtual interface for all tasks */37struct TaskFunction {38virtual void execute() = 0;39};404142struct TaskGroupContext {43TaskGroupContext() : cancellingException(nullptr) {}4445std::exception_ptr cancellingException;46};4748/*! builds a task interface from a closure */49template<typename Closure>50struct ClosureTaskFunction : public TaskFunction51{52Closure closure;53__forceinline ClosureTaskFunction (const Closure& closure) : closure(closure) {}54void execute() { closure(); };55};5657struct __aligned(64) Task58{59/*! states a task can be in */60enum { DONE, INITIALIZED };6162/*! switch from one state to another */63__forceinline void switch_state(int from, int to)64{65__memory_barrier();66MAYBE_UNUSED bool success = state.compare_exchange_strong(from,to);67assert(success);68}6970/*! try to switch from one state to another */71__forceinline bool try_switch_state(int from, int to) {72__memory_barrier();73return state.compare_exchange_strong(from,to);74}7576/*! increment/decrement dependency counter */77void add_dependencies(int n) {78dependencies+=n;79}8081/*! initialize all tasks to DONE state by default */82__forceinline Task()83: state(DONE) {}8485/*! construction of new task */86__forceinline Task (TaskFunction* closure, Task* parent, TaskGroupContext* context, size_t stackPtr, size_t N)87: dependencies(1), stealable(true), closure(closure), parent(parent), context(context), stackPtr(stackPtr), N(N)88{89if (parent) parent->add_dependencies(+1);90switch_state(DONE,INITIALIZED);91}9293/*! construction of stolen task, stealing thread will decrement initial dependency */94__forceinline Task (TaskFunction* closure, Task* parent, TaskGroupContext* context)95: dependencies(1), stealable(false), closure(closure), parent(parent), context(context), stackPtr(-1), N(1)96{97switch_state(DONE,INITIALIZED);98}99100/*! try to steal this task */101bool try_steal(Task& child)102{103if (!stealable) return false;104if (!try_switch_state(INITIALIZED,DONE)) return false;105new (&child) Task(closure, this, context);106return true;107}108109/*! run this task */110dll_export void run(Thread& thread);111112void run_internal(Thread& thread);113114public:115std::atomic<int> state; //!< state this task is in116std::atomic<int> dependencies; //!< dependencies to wait for117std::atomic<bool> stealable; //!< true if task can be stolen118TaskFunction* closure; //!< the closure to execute119Task* parent; //!< parent task to signal when we are finished120TaskGroupContext* context;121size_t stackPtr; //!< stack location where closure is stored122size_t N; //!< approximative size of task123};124125struct TaskQueue126{127TaskQueue ()128: left(0), right(0), stackPtr(0) {}129130__forceinline void* alloc(size_t bytes, size_t align = 64)131{132size_t ofs = bytes + ((align - stackPtr) & (align-1));133if (stackPtr + ofs > CLOSURE_STACK_SIZE)134abort(); //throw std::runtime_error("closure stack overflow");135stackPtr += ofs;136return &stack[stackPtr-bytes];137}138139template<typename Closure>140__forceinline void push_right(Thread& thread, const size_t size, const Closure& closure, TaskGroupContext* context)141{142if (right >= TASK_STACK_SIZE)143abort(); //throw std::runtime_error("task stack overflow");144145/* allocate new task on right side of stack */146size_t oldStackPtr = stackPtr;147TaskFunction* func = new (alloc(sizeof(ClosureTaskFunction<Closure>))) ClosureTaskFunction<Closure>(closure);148new (&tasks[right.load()]) Task(func,thread.task,context,oldStackPtr,size);149right++;150151/* also move left pointer */152if (left >= right-1) left = right-1;153}154155dll_export bool execute_local(Thread& thread, Task* parent);156bool execute_local_internal(Thread& thread, Task* parent);157bool steal(Thread& thread);158size_t getTaskSizeAtLeft();159160bool empty() { return right == 0; }161162public:163164/* task stack */165Task tasks[TASK_STACK_SIZE];166__aligned(64) std::atomic<size_t> left; //!< threads steal from left167__aligned(64) std::atomic<size_t> right; //!< new tasks are added to the right168169/* closure stack */170__aligned(64) char stack[CLOSURE_STACK_SIZE];171size_t stackPtr;172};173174/*! thread local structure for each thread */175struct Thread176{177ALIGNED_STRUCT_(64);178179Thread (size_t threadIndex, const Ref<TaskScheduler>& scheduler)180: threadIndex(threadIndex), task(nullptr), scheduler(scheduler) {}181182__forceinline size_t threadCount() {183return scheduler->threadCounter;184}185186size_t threadIndex; //!< ID of this thread187TaskQueue tasks; //!< local task queue188Task* task; //!< current active task189Ref<TaskScheduler> scheduler; //!< pointer to task scheduler190};191192/*! pool of worker threads */193struct ThreadPool194{195ThreadPool (bool set_affinity);196~ThreadPool ();197198/*! starts the threads */199dll_export void startThreads();200201/*! sets number of threads to use */202void setNumThreads(size_t numThreads, bool startThreads = false);203204/*! adds a task scheduler object for scheduling */205dll_export void add(const Ref<TaskScheduler>& scheduler);206207/*! remove the task scheduler object again */208dll_export void remove(const Ref<TaskScheduler>& scheduler);209210/*! returns number of threads of the thread pool */211size_t size() const { return numThreads; }212213/*! main loop for all threads */214void thread_loop(size_t threadIndex);215216private:217std::atomic<size_t> numThreads;218std::atomic<size_t> numThreadsRunning;219bool set_affinity;220std::atomic<bool> running;221std::vector<thread_t> threads;222223private:224MutexSys mutex;225ConditionSys condition;226std::list<Ref<TaskScheduler> > schedulers;227};228229TaskScheduler ();230~TaskScheduler ();231232/*! initializes the task scheduler */233static void create(size_t numThreads, bool set_affinity, bool start_threads);234235/*! destroys the task scheduler again */236static void destroy();237238/*! lets new worker threads join the tasking system */239void join();240void reset();241242/*! let a worker thread allocate a thread index */243dll_export ssize_t allocThreadIndex();244245/*! wait for some number of threads available (threadCount includes main thread) */246void wait_for_threads(size_t threadCount);247248/*! thread loop for all worker threads */249void thread_loop(size_t threadIndex);250251/*! steals a task from a different thread */252bool steal_from_other_threads(Thread& thread);253254template<typename Predicate, typename Body>255static void steal_loop(Thread& thread, const Predicate& pred, const Body& body);256257/* spawn a new task at the top of the threads task stack */258template<typename Closure>259void spawn_root(const Closure& closure, TaskGroupContext* context, size_t size = 1, bool useThreadPool = true)260{261if (useThreadPool) startThreads();262263size_t threadIndex = allocThreadIndex();264std::unique_ptr<Thread> mthread(new Thread(threadIndex,this)); // too large for stack allocation265Thread& thread = *mthread;266assert(threadLocal[threadIndex].load() == nullptr);267threadLocal[threadIndex] = &thread;268Thread* oldThread = swapThread(&thread);269thread.tasks.push_right(thread,size,closure,context);270{271Lock<MutexSys> lock(mutex);272anyTasksRunning++;273hasRootTask = true;274condition.notify_all();275}276277if (useThreadPool) addScheduler(this);278279while (thread.tasks.execute_local(thread,nullptr));280anyTasksRunning--;281if (useThreadPool) removeScheduler(this);282283threadLocal[threadIndex] = nullptr;284swapThread(oldThread);285286/* remember exception to throw */287std::exception_ptr except = nullptr;288if (context->cancellingException != nullptr) except = context->cancellingException;289290/* wait for all threads to terminate */291threadCounter--;292while (threadCounter > 0) yield();293context->cancellingException = nullptr;294295/* re-throw proper exception */296if (except != nullptr) {297std::rethrow_exception(except);298}299}300301/* spawn a new task at the top of the threads task stack */302template<typename Closure>303static __forceinline void spawn(size_t size, const Closure& closure, TaskGroupContext* context)304{305Thread* thread = TaskScheduler::thread();306if (likely(thread != nullptr)) thread->tasks.push_right(*thread,size,closure,context);307else instance()->spawn_root(closure,context,size);308}309310/* spawn a new task at the top of the threads task stack */311template<typename Closure>312static __forceinline void spawn(const Closure& closure, TaskGroupContext* taskGroupContext) {313spawn(1,closure,taskGroupContext);314}315316/* spawn a new task set */317template<typename Index, typename Closure>318static void spawn(const Index begin, const Index end, const Index blockSize, const Closure& closure, TaskGroupContext* context)319{320spawn(end-begin, [=]()321{322if (end-begin <= blockSize) {323return closure(range<Index>(begin,end));324}325const Index center = (begin+end)/2;326spawn(begin,center,blockSize,closure,context);327spawn(center,end ,blockSize,closure,context);328wait();329},context);330}331332/* work on spawned subtasks and wait until all have finished */333dll_export static void wait();334335/* returns the ID of the current thread */336dll_export static size_t threadID();337338/* returns the index (0..threadCount-1) of the current thread */339dll_export static size_t threadIndex();340341/* returns the total number of threads */342dll_export static size_t threadCount();343344private:345346/* returns the thread local task list of this worker thread */347dll_export static Thread* thread();348349/* sets the thread local task list of this worker thread */350dll_export static Thread* swapThread(Thread* thread);351352/*! returns the taskscheduler object to be used by the master thread */353dll_export static TaskScheduler* instance();354355/*! starts the threads */356dll_export static void startThreads();357358/*! adds a task scheduler object for scheduling */359dll_export static void addScheduler(const Ref<TaskScheduler>& scheduler);360361/*! remove the task scheduler object again */362dll_export static void removeScheduler(const Ref<TaskScheduler>& scheduler);363364private:365std::vector<atomic<Thread*>> threadLocal;366std::atomic<size_t> threadCounter;367std::atomic<size_t> anyTasksRunning;368std::atomic<bool> hasRootTask;369MutexSys mutex;370ConditionSys condition;371372private:373static size_t g_numThreads;374static __thread TaskScheduler* g_instance;375static __thread Thread* thread_local_thread;376static ThreadPool* threadPool;377};378379RTC_NAMESPACE_END380381#if defined(RTC_NAMESPACE)382using RTC_NAMESPACE::TaskScheduler;383#endif384}385386387