Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
godotengine
GitHub Repository: godotengine/godot
Path: blob/master/thirdparty/embree/common/tasking/taskschedulerinternal.h
9912 views
1
// Copyright 2009-2021 Intel Corporation
2
// SPDX-License-Identifier: Apache-2.0
3
4
#pragma once
5
6
#include "../../include/embree4/rtcore.h"
7
#include "../sys/platform.h"
8
#include "../sys/alloc.h"
9
#include "../sys/barrier.h"
10
#include "../sys/thread.h"
11
#include "../sys/mutex.h"
12
#include "../sys/condition.h"
13
#include "../sys/ref.h"
14
#include "../sys/atomic.h"
15
#include "../math/range.h"
16
17
#include <list>
18
19
namespace embree
20
{
21
22
/* The tasking system exports some symbols to be used by the tutorials. Thus we
23
hide is also in the API namespace when requested. */
24
RTC_NAMESPACE_BEGIN
25
26
struct TaskScheduler : public RefCount
27
{
28
ALIGNED_STRUCT_(64);
29
friend class Device;
30
31
static const size_t TASK_STACK_SIZE = 4*1024; //!< task structure stack
32
static const size_t CLOSURE_STACK_SIZE = 512*1024; //!< stack for task closures
33
34
struct Thread;
35
36
/*! virtual interface for all tasks */
37
struct TaskFunction {
38
virtual void execute() = 0;
39
};
40
41
42
struct TaskGroupContext {
43
TaskGroupContext() : cancellingException(nullptr) {}
44
45
std::exception_ptr cancellingException;
46
};
47
48
/*! builds a task interface from a closure */
49
template<typename Closure>
50
struct ClosureTaskFunction : public TaskFunction
51
{
52
Closure closure;
53
__forceinline ClosureTaskFunction (const Closure& closure) : closure(closure) {}
54
void execute() { closure(); };
55
};
56
57
struct __aligned(64) Task
58
{
59
/*! states a task can be in */
60
enum { DONE, INITIALIZED };
61
62
/*! switch from one state to another */
63
__forceinline void switch_state(int from, int to)
64
{
65
__memory_barrier();
66
MAYBE_UNUSED bool success = state.compare_exchange_strong(from,to);
67
assert(success);
68
}
69
70
/*! try to switch from one state to another */
71
__forceinline bool try_switch_state(int from, int to) {
72
__memory_barrier();
73
return state.compare_exchange_strong(from,to);
74
}
75
76
/*! increment/decrement dependency counter */
77
void add_dependencies(int n) {
78
dependencies+=n;
79
}
80
81
/*! initialize all tasks to DONE state by default */
82
__forceinline Task()
83
: state(DONE) {}
84
85
/*! construction of new task */
86
__forceinline Task (TaskFunction* closure, Task* parent, TaskGroupContext* context, size_t stackPtr, size_t N)
87
: dependencies(1), stealable(true), closure(closure), parent(parent), context(context), stackPtr(stackPtr), N(N)
88
{
89
if (parent) parent->add_dependencies(+1);
90
switch_state(DONE,INITIALIZED);
91
}
92
93
/*! construction of stolen task, stealing thread will decrement initial dependency */
94
__forceinline Task (TaskFunction* closure, Task* parent, TaskGroupContext* context)
95
: dependencies(1), stealable(false), closure(closure), parent(parent), context(context), stackPtr(-1), N(1)
96
{
97
switch_state(DONE,INITIALIZED);
98
}
99
100
/*! try to steal this task */
101
bool try_steal(Task& child)
102
{
103
if (!stealable) return false;
104
if (!try_switch_state(INITIALIZED,DONE)) return false;
105
new (&child) Task(closure, this, context);
106
return true;
107
}
108
109
/*! run this task */
110
dll_export void run(Thread& thread);
111
112
void run_internal(Thread& thread);
113
114
public:
115
std::atomic<int> state; //!< state this task is in
116
std::atomic<int> dependencies; //!< dependencies to wait for
117
std::atomic<bool> stealable; //!< true if task can be stolen
118
TaskFunction* closure; //!< the closure to execute
119
Task* parent; //!< parent task to signal when we are finished
120
TaskGroupContext* context;
121
size_t stackPtr; //!< stack location where closure is stored
122
size_t N; //!< approximative size of task
123
};
124
125
struct TaskQueue
126
{
127
TaskQueue ()
128
: left(0), right(0), stackPtr(0) {}
129
130
__forceinline void* alloc(size_t bytes, size_t align = 64)
131
{
132
size_t ofs = bytes + ((align - stackPtr) & (align-1));
133
if (stackPtr + ofs > CLOSURE_STACK_SIZE)
134
abort(); //throw std::runtime_error("closure stack overflow");
135
stackPtr += ofs;
136
return &stack[stackPtr-bytes];
137
}
138
139
template<typename Closure>
140
__forceinline void push_right(Thread& thread, const size_t size, const Closure& closure, TaskGroupContext* context)
141
{
142
if (right >= TASK_STACK_SIZE)
143
abort(); //throw std::runtime_error("task stack overflow");
144
145
/* allocate new task on right side of stack */
146
size_t oldStackPtr = stackPtr;
147
TaskFunction* func = new (alloc(sizeof(ClosureTaskFunction<Closure>))) ClosureTaskFunction<Closure>(closure);
148
new (&tasks[right.load()]) Task(func,thread.task,context,oldStackPtr,size);
149
right++;
150
151
/* also move left pointer */
152
if (left >= right-1) left = right-1;
153
}
154
155
dll_export bool execute_local(Thread& thread, Task* parent);
156
bool execute_local_internal(Thread& thread, Task* parent);
157
bool steal(Thread& thread);
158
size_t getTaskSizeAtLeft();
159
160
bool empty() { return right == 0; }
161
162
public:
163
164
/* task stack */
165
Task tasks[TASK_STACK_SIZE];
166
__aligned(64) std::atomic<size_t> left; //!< threads steal from left
167
__aligned(64) std::atomic<size_t> right; //!< new tasks are added to the right
168
169
/* closure stack */
170
__aligned(64) char stack[CLOSURE_STACK_SIZE];
171
size_t stackPtr;
172
};
173
174
/*! thread local structure for each thread */
175
struct Thread
176
{
177
ALIGNED_STRUCT_(64);
178
179
Thread (size_t threadIndex, const Ref<TaskScheduler>& scheduler)
180
: threadIndex(threadIndex), task(nullptr), scheduler(scheduler) {}
181
182
__forceinline size_t threadCount() {
183
return scheduler->threadCounter;
184
}
185
186
size_t threadIndex; //!< ID of this thread
187
TaskQueue tasks; //!< local task queue
188
Task* task; //!< current active task
189
Ref<TaskScheduler> scheduler; //!< pointer to task scheduler
190
};
191
192
/*! pool of worker threads */
193
struct ThreadPool
194
{
195
ThreadPool (bool set_affinity);
196
~ThreadPool ();
197
198
/*! starts the threads */
199
dll_export void startThreads();
200
201
/*! sets number of threads to use */
202
void setNumThreads(size_t numThreads, bool startThreads = false);
203
204
/*! adds a task scheduler object for scheduling */
205
dll_export void add(const Ref<TaskScheduler>& scheduler);
206
207
/*! remove the task scheduler object again */
208
dll_export void remove(const Ref<TaskScheduler>& scheduler);
209
210
/*! returns number of threads of the thread pool */
211
size_t size() const { return numThreads; }
212
213
/*! main loop for all threads */
214
void thread_loop(size_t threadIndex);
215
216
private:
217
std::atomic<size_t> numThreads;
218
std::atomic<size_t> numThreadsRunning;
219
bool set_affinity;
220
std::atomic<bool> running;
221
std::vector<thread_t> threads;
222
223
private:
224
MutexSys mutex;
225
ConditionSys condition;
226
std::list<Ref<TaskScheduler> > schedulers;
227
};
228
229
TaskScheduler ();
230
~TaskScheduler ();
231
232
/*! initializes the task scheduler */
233
static void create(size_t numThreads, bool set_affinity, bool start_threads);
234
235
/*! destroys the task scheduler again */
236
static void destroy();
237
238
/*! lets new worker threads join the tasking system */
239
void join();
240
void reset();
241
242
/*! let a worker thread allocate a thread index */
243
dll_export ssize_t allocThreadIndex();
244
245
/*! wait for some number of threads available (threadCount includes main thread) */
246
void wait_for_threads(size_t threadCount);
247
248
/*! thread loop for all worker threads */
249
void thread_loop(size_t threadIndex);
250
251
/*! steals a task from a different thread */
252
bool steal_from_other_threads(Thread& thread);
253
254
template<typename Predicate, typename Body>
255
static void steal_loop(Thread& thread, const Predicate& pred, const Body& body);
256
257
/* spawn a new task at the top of the threads task stack */
258
template<typename Closure>
259
void spawn_root(const Closure& closure, TaskGroupContext* context, size_t size = 1, bool useThreadPool = true)
260
{
261
if (useThreadPool) startThreads();
262
263
size_t threadIndex = allocThreadIndex();
264
std::unique_ptr<Thread> mthread(new Thread(threadIndex,this)); // too large for stack allocation
265
Thread& thread = *mthread;
266
assert(threadLocal[threadIndex].load() == nullptr);
267
threadLocal[threadIndex] = &thread;
268
Thread* oldThread = swapThread(&thread);
269
thread.tasks.push_right(thread,size,closure,context);
270
{
271
Lock<MutexSys> lock(mutex);
272
anyTasksRunning++;
273
hasRootTask = true;
274
condition.notify_all();
275
}
276
277
if (useThreadPool) addScheduler(this);
278
279
while (thread.tasks.execute_local(thread,nullptr));
280
anyTasksRunning--;
281
if (useThreadPool) removeScheduler(this);
282
283
threadLocal[threadIndex] = nullptr;
284
swapThread(oldThread);
285
286
/* remember exception to throw */
287
std::exception_ptr except = nullptr;
288
if (context->cancellingException != nullptr) except = context->cancellingException;
289
290
/* wait for all threads to terminate */
291
threadCounter--;
292
while (threadCounter > 0) yield();
293
context->cancellingException = nullptr;
294
295
/* re-throw proper exception */
296
if (except != nullptr) {
297
std::rethrow_exception(except);
298
}
299
}
300
301
/* spawn a new task at the top of the threads task stack */
302
template<typename Closure>
303
static __forceinline void spawn(size_t size, const Closure& closure, TaskGroupContext* context)
304
{
305
Thread* thread = TaskScheduler::thread();
306
if (likely(thread != nullptr)) thread->tasks.push_right(*thread,size,closure,context);
307
else instance()->spawn_root(closure,context,size);
308
}
309
310
/* spawn a new task at the top of the threads task stack */
311
template<typename Closure>
312
static __forceinline void spawn(const Closure& closure, TaskGroupContext* taskGroupContext) {
313
spawn(1,closure,taskGroupContext);
314
}
315
316
/* spawn a new task set */
317
template<typename Index, typename Closure>
318
static void spawn(const Index begin, const Index end, const Index blockSize, const Closure& closure, TaskGroupContext* context)
319
{
320
spawn(end-begin, [=]()
321
{
322
if (end-begin <= blockSize) {
323
return closure(range<Index>(begin,end));
324
}
325
const Index center = (begin+end)/2;
326
spawn(begin,center,blockSize,closure,context);
327
spawn(center,end ,blockSize,closure,context);
328
wait();
329
},context);
330
}
331
332
/* work on spawned subtasks and wait until all have finished */
333
dll_export static void wait();
334
335
/* returns the ID of the current thread */
336
dll_export static size_t threadID();
337
338
/* returns the index (0..threadCount-1) of the current thread */
339
dll_export static size_t threadIndex();
340
341
/* returns the total number of threads */
342
dll_export static size_t threadCount();
343
344
private:
345
346
/* returns the thread local task list of this worker thread */
347
dll_export static Thread* thread();
348
349
/* sets the thread local task list of this worker thread */
350
dll_export static Thread* swapThread(Thread* thread);
351
352
/*! returns the taskscheduler object to be used by the master thread */
353
dll_export static TaskScheduler* instance();
354
355
/*! starts the threads */
356
dll_export static void startThreads();
357
358
/*! adds a task scheduler object for scheduling */
359
dll_export static void addScheduler(const Ref<TaskScheduler>& scheduler);
360
361
/*! remove the task scheduler object again */
362
dll_export static void removeScheduler(const Ref<TaskScheduler>& scheduler);
363
364
private:
365
std::vector<atomic<Thread*>> threadLocal;
366
std::atomic<size_t> threadCounter;
367
std::atomic<size_t> anyTasksRunning;
368
std::atomic<bool> hasRootTask;
369
MutexSys mutex;
370
ConditionSys condition;
371
372
private:
373
static size_t g_numThreads;
374
static __thread TaskScheduler* g_instance;
375
static __thread Thread* thread_local_thread;
376
static ThreadPool* threadPool;
377
};
378
379
RTC_NAMESPACE_END
380
381
#if defined(RTC_NAMESPACE)
382
using RTC_NAMESPACE::TaskScheduler;
383
#endif
384
}
385
386