Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
godotengine
GitHub Repository: godotengine/godot
Path: blob/master/thirdparty/embree/common/tasking/taskschedulerinternal.h
21661 views
1
// Copyright 2009-2021 Intel Corporation
2
// SPDX-License-Identifier: Apache-2.0
3
4
#pragma once
5
6
#include "../../include/embree4/rtcore.h"
7
#include "../sys/platform.h"
8
#include "../sys/alloc.h"
9
#include "../sys/barrier.h"
10
#include "../sys/thread.h"
11
#include "../sys/mutex.h"
12
#include "../sys/condition.h"
13
#include "../sys/ref.h"
14
#include "../sys/atomic.h"
15
#include "../math/range.h"
16
17
#include <exception>
18
#include <list>
19
20
namespace embree
21
{
22
23
/* The tasking system exports some symbols to be used by the tutorials. Thus we
24
hide is also in the API namespace when requested. */
25
RTC_NAMESPACE_BEGIN
26
27
struct TaskScheduler : public RefCount
28
{
29
ALIGNED_STRUCT_(64);
30
friend class Device;
31
32
static const size_t TASK_STACK_SIZE = 4*1024; //!< task structure stack
33
static const size_t CLOSURE_STACK_SIZE = 512*1024; //!< stack for task closures
34
35
struct Thread;
36
37
/*! virtual interface for all tasks */
38
struct TaskFunction {
39
virtual void execute() = 0;
40
};
41
42
43
struct TaskGroupContext {
44
TaskGroupContext() : cancellingException(nullptr) {}
45
46
std::exception_ptr cancellingException;
47
};
48
49
/*! builds a task interface from a closure */
50
template<typename Closure>
51
struct ClosureTaskFunction : public TaskFunction
52
{
53
Closure closure;
54
__forceinline ClosureTaskFunction (const Closure& closure) : closure(closure) {}
55
void execute() { closure(); };
56
};
57
58
struct __aligned(64) Task
59
{
60
/*! states a task can be in */
61
enum { DONE, INITIALIZED };
62
63
/*! switch from one state to another */
64
__forceinline void switch_state(int from, int to)
65
{
66
__memory_barrier();
67
MAYBE_UNUSED bool success = state.compare_exchange_strong(from,to);
68
assert(success);
69
}
70
71
/*! try to switch from one state to another */
72
__forceinline bool try_switch_state(int from, int to) {
73
__memory_barrier();
74
return state.compare_exchange_strong(from,to);
75
}
76
77
/*! increment/decrement dependency counter */
78
void add_dependencies(int n) {
79
dependencies+=n;
80
}
81
82
/*! initialize all tasks to DONE state by default */
83
__forceinline Task()
84
: state(DONE) {}
85
86
/*! construction of new task */
87
__forceinline Task (TaskFunction* closure, Task* parent, TaskGroupContext* context, size_t stackPtr, size_t N)
88
: dependencies(1), stealable(true), closure(closure), parent(parent), context(context), stackPtr(stackPtr), N(N)
89
{
90
if (parent) parent->add_dependencies(+1);
91
switch_state(DONE,INITIALIZED);
92
}
93
94
/*! construction of stolen task, stealing thread will decrement initial dependency */
95
__forceinline Task (TaskFunction* closure, Task* parent, TaskGroupContext* context)
96
: dependencies(1), stealable(false), closure(closure), parent(parent), context(context), stackPtr(-1), N(1)
97
{
98
switch_state(DONE,INITIALIZED);
99
}
100
101
/*! try to steal this task */
102
bool try_steal(Task& child)
103
{
104
if (!stealable) return false;
105
if (!try_switch_state(INITIALIZED,DONE)) return false;
106
new (&child) Task(closure, this, context);
107
return true;
108
}
109
110
/*! run this task */
111
dll_export void run(Thread& thread);
112
113
void run_internal(Thread& thread);
114
115
public:
116
std::atomic<int> state; //!< state this task is in
117
std::atomic<int> dependencies; //!< dependencies to wait for
118
std::atomic<bool> stealable; //!< true if task can be stolen
119
TaskFunction* closure; //!< the closure to execute
120
Task* parent; //!< parent task to signal when we are finished
121
TaskGroupContext* context;
122
size_t stackPtr; //!< stack location where closure is stored
123
size_t N; //!< approximative size of task
124
};
125
126
struct TaskQueue
127
{
128
TaskQueue ()
129
: left(0), right(0), stackPtr(0) {}
130
131
__forceinline void* alloc(size_t bytes, size_t align = 64)
132
{
133
size_t ofs = bytes + ((align - stackPtr) & (align-1));
134
if (stackPtr + ofs > CLOSURE_STACK_SIZE)
135
abort(); //throw std::runtime_error("closure stack overflow");
136
stackPtr += ofs;
137
return &stack[stackPtr-bytes];
138
}
139
140
template<typename Closure>
141
__forceinline void push_right(Thread& thread, const size_t size, const Closure& closure, TaskGroupContext* context)
142
{
143
if (right >= TASK_STACK_SIZE)
144
abort(); //throw std::runtime_error("task stack overflow");
145
146
/* allocate new task on right side of stack */
147
size_t oldStackPtr = stackPtr;
148
TaskFunction* func = new (alloc(sizeof(ClosureTaskFunction<Closure>))) ClosureTaskFunction<Closure>(closure);
149
new (&tasks[right.load()]) Task(func,thread.task,context,oldStackPtr,size);
150
right++;
151
152
/* also move left pointer */
153
if (left >= right-1) left = right-1;
154
}
155
156
dll_export bool execute_local(Thread& thread, Task* parent);
157
bool execute_local_internal(Thread& thread, Task* parent);
158
bool steal(Thread& thread);
159
size_t getTaskSizeAtLeft();
160
161
bool empty() { return right == 0; }
162
163
public:
164
165
/* task stack */
166
Task tasks[TASK_STACK_SIZE];
167
__aligned(64) std::atomic<size_t> left; //!< threads steal from left
168
__aligned(64) std::atomic<size_t> right; //!< new tasks are added to the right
169
170
/* closure stack */
171
__aligned(64) char stack[CLOSURE_STACK_SIZE];
172
size_t stackPtr;
173
};
174
175
/*! thread local structure for each thread */
176
struct Thread
177
{
178
ALIGNED_STRUCT_(64);
179
180
Thread (size_t threadIndex, const Ref<TaskScheduler>& scheduler)
181
: threadIndex(threadIndex), task(nullptr), scheduler(scheduler) {}
182
183
__forceinline size_t threadCount() {
184
return scheduler->threadCounter;
185
}
186
187
size_t threadIndex; //!< ID of this thread
188
TaskQueue tasks; //!< local task queue
189
Task* task; //!< current active task
190
Ref<TaskScheduler> scheduler; //!< pointer to task scheduler
191
};
192
193
/*! pool of worker threads */
194
struct ThreadPool
195
{
196
ThreadPool (bool set_affinity);
197
~ThreadPool ();
198
199
/*! starts the threads */
200
dll_export void startThreads();
201
202
/*! sets number of threads to use */
203
void setNumThreads(size_t numThreads, bool startThreads = false);
204
205
/*! adds a task scheduler object for scheduling */
206
dll_export void add(const Ref<TaskScheduler>& scheduler);
207
208
/*! remove the task scheduler object again */
209
dll_export void remove(const Ref<TaskScheduler>& scheduler);
210
211
/*! returns number of threads of the thread pool */
212
size_t size() const { return numThreads; }
213
214
/*! main loop for all threads */
215
void thread_loop(size_t threadIndex);
216
217
private:
218
std::atomic<size_t> numThreads;
219
std::atomic<size_t> numThreadsRunning;
220
bool set_affinity;
221
std::atomic<bool> running;
222
std::vector<thread_t> threads;
223
224
private:
225
MutexSys mutex;
226
ConditionSys condition;
227
std::list<Ref<TaskScheduler> > schedulers;
228
};
229
230
TaskScheduler ();
231
~TaskScheduler ();
232
233
/*! initializes the task scheduler */
234
static void create(size_t numThreads, bool set_affinity, bool start_threads);
235
236
/*! destroys the task scheduler again */
237
static void destroy();
238
239
/*! lets new worker threads join the tasking system */
240
void join();
241
void reset();
242
243
/*! let a worker thread allocate a thread index */
244
dll_export ssize_t allocThreadIndex();
245
246
/*! wait for some number of threads available (threadCount includes main thread) */
247
void wait_for_threads(size_t threadCount);
248
249
/*! thread loop for all worker threads */
250
void thread_loop(size_t threadIndex);
251
252
/*! steals a task from a different thread */
253
bool steal_from_other_threads(Thread& thread);
254
255
template<typename Predicate, typename Body>
256
static void steal_loop(Thread& thread, const Predicate& pred, const Body& body);
257
258
/* spawn a new task at the top of the threads task stack */
259
template<typename Closure>
260
void spawn_root(const Closure& closure, TaskGroupContext* context, size_t size = 1, bool useThreadPool = true)
261
{
262
if (useThreadPool) startThreads();
263
264
size_t threadIndex = allocThreadIndex();
265
std::unique_ptr<Thread> mthread(new Thread(threadIndex,this)); // too large for stack allocation
266
Thread& thread = *mthread;
267
assert(threadLocal[threadIndex].load() == nullptr);
268
threadLocal[threadIndex] = &thread;
269
Thread* oldThread = swapThread(&thread);
270
thread.tasks.push_right(thread,size,closure,context);
271
{
272
Lock<MutexSys> lock(mutex);
273
anyTasksRunning++;
274
hasRootTask = true;
275
condition.notify_all();
276
}
277
278
if (useThreadPool) addScheduler(this);
279
280
while (thread.tasks.execute_local(thread,nullptr));
281
anyTasksRunning--;
282
if (useThreadPool) removeScheduler(this);
283
284
threadLocal[threadIndex] = nullptr;
285
swapThread(oldThread);
286
287
/* remember exception to throw */
288
std::exception_ptr except = nullptr;
289
if (context->cancellingException != nullptr) except = context->cancellingException;
290
291
/* wait for all threads to terminate */
292
threadCounter--;
293
while (threadCounter > 0) yield();
294
context->cancellingException = nullptr;
295
296
/* re-throw proper exception */
297
if (except != nullptr) {
298
std::rethrow_exception(except);
299
}
300
}
301
302
/* spawn a new task at the top of the threads task stack */
303
template<typename Closure>
304
static __forceinline void spawn(size_t size, const Closure& closure, TaskGroupContext* context)
305
{
306
Thread* thread = TaskScheduler::thread();
307
if (likely(thread != nullptr)) thread->tasks.push_right(*thread,size,closure,context);
308
else instance()->spawn_root(closure,context,size);
309
}
310
311
/* spawn a new task at the top of the threads task stack */
312
template<typename Closure>
313
static __forceinline void spawn(const Closure& closure, TaskGroupContext* taskGroupContext) {
314
spawn(1,closure,taskGroupContext);
315
}
316
317
/* spawn a new task set */
318
template<typename Index, typename Closure>
319
static void spawn(const Index begin, const Index end, const Index blockSize, const Closure& closure, TaskGroupContext* context)
320
{
321
spawn(end-begin, [=]()
322
{
323
if (end-begin <= blockSize) {
324
return closure(range<Index>(begin,end));
325
}
326
const Index center = (begin+end)/2;
327
spawn(begin,center,blockSize,closure,context);
328
spawn(center,end ,blockSize,closure,context);
329
wait();
330
},context);
331
}
332
333
/* work on spawned subtasks and wait until all have finished */
334
dll_export static void wait();
335
336
/* returns the ID of the current thread */
337
dll_export static size_t threadID();
338
339
/* returns the index (0..threadCount-1) of the current thread */
340
dll_export static size_t threadIndex();
341
342
/* returns the total number of threads */
343
dll_export static size_t threadCount();
344
345
private:
346
347
/* returns the thread local task list of this worker thread */
348
dll_export static Thread* thread();
349
350
/* sets the thread local task list of this worker thread */
351
dll_export static Thread* swapThread(Thread* thread);
352
353
/*! returns the taskscheduler object to be used by the master thread */
354
dll_export static TaskScheduler* instance();
355
356
/*! starts the threads */
357
dll_export static void startThreads();
358
359
/*! adds a task scheduler object for scheduling */
360
dll_export static void addScheduler(const Ref<TaskScheduler>& scheduler);
361
362
/*! remove the task scheduler object again */
363
dll_export static void removeScheduler(const Ref<TaskScheduler>& scheduler);
364
365
private:
366
std::vector<atomic<Thread*>> threadLocal;
367
std::atomic<size_t> threadCounter;
368
std::atomic<size_t> anyTasksRunning;
369
std::atomic<bool> hasRootTask;
370
MutexSys mutex;
371
ConditionSys condition;
372
373
private:
374
static size_t g_numThreads;
375
static __thread TaskScheduler* g_instance;
376
static __thread Thread* thread_local_thread;
377
static ThreadPool* threadPool;
378
};
379
380
RTC_NAMESPACE_END
381
382
#if defined(RTC_NAMESPACE)
383
using RTC_NAMESPACE::TaskScheduler;
384
#endif
385
}
386
387