Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
godotengine
GitHub Repository: godotengine/godot
Path: blob/master/thirdparty/embree/common/tasking/taskschedulerinternal.cpp
9912 views
1
// Copyright 2009-2021 Intel Corporation
2
// SPDX-License-Identifier: Apache-2.0
3
4
#include "taskschedulerinternal.h"
5
#include "../math/emath.h"
6
#include "../sys/sysinfo.h"
7
#include <algorithm>
8
9
namespace embree
10
{
11
RTC_NAMESPACE_BEGIN
12
13
static MutexSys g_mutex;
14
size_t TaskScheduler::g_numThreads = 0;
15
__thread TaskScheduler* TaskScheduler::g_instance = nullptr;
16
std::vector<Ref<TaskScheduler>> g_instance_vector;
17
__thread TaskScheduler::Thread* TaskScheduler::thread_local_thread = nullptr;
18
TaskScheduler::ThreadPool* TaskScheduler::threadPool = nullptr;
19
20
template<typename Predicate, typename Body>
21
__forceinline void TaskScheduler::steal_loop(Thread& thread, const Predicate& pred, const Body& body)
22
{
23
while (true)
24
{
25
/*! some rounds that yield */
26
for (size_t i=0; i<32; i++)
27
{
28
/*! some spinning rounds */
29
const size_t threadCount = thread.threadCount();
30
for (size_t j=0; j<1024; j+=threadCount)
31
{
32
if (!pred()) return;
33
if (thread.scheduler->steal_from_other_threads(thread)) {
34
i=j=0;
35
body();
36
}
37
}
38
yield();
39
}
40
}
41
}
42
43
/*! run this task */
44
void TaskScheduler::Task::run_internal (Thread& thread) // FIXME: avoid as many dll_exports as possible
45
{
46
/* try to run if not already stolen */
47
if (try_switch_state(INITIALIZED,DONE))
48
{
49
Task* prevTask = thread.task;
50
thread.task = this;
51
//try {
52
// if (context->cancellingException == nullptr)
53
closure->execute();
54
//} catch (...) {
55
// if (context->cancellingException == nullptr)
56
// context->cancellingException = std::current_exception();
57
//}
58
thread.task = prevTask;
59
add_dependencies(-1);
60
}
61
62
/* steal until all dependencies have completed */
63
steal_loop(thread,
64
[&] () { return dependencies>0; },
65
[&] () { while (thread.tasks.execute_local_internal(thread,this)); });
66
67
/* now signal our parent task that we are finished */
68
if (parent)
69
parent->add_dependencies(-1);
70
}
71
72
/*! run this task */
73
dll_export void TaskScheduler::Task::run (Thread& thread) {
74
run_internal(thread);
75
}
76
77
bool TaskScheduler::TaskQueue::execute_local_internal(Thread& thread, Task* parent)
78
{
79
/* stop if we run out of local tasks or reach the waiting task */
80
if (right == 0 || &tasks[right-1] == parent)
81
return false;
82
83
/* execute task */
84
size_t oldRight = right;
85
tasks[right-1].run_internal(thread);
86
if (right != oldRight) {
87
THROW_RUNTIME_ERROR("you have to wait for spawned subtasks");
88
}
89
90
/* pop task and closure from stack */
91
right--;
92
if (tasks[right].stackPtr != size_t(-1))
93
stackPtr = tasks[right].stackPtr;
94
95
/* also move left pointer */
96
if (left >= right) left.store(right.load());
97
98
return right != 0;
99
}
100
101
dll_export bool TaskScheduler::TaskQueue::execute_local(Thread& thread, Task* parent) {
102
return execute_local_internal(thread,parent);
103
}
104
105
bool TaskScheduler::TaskQueue::steal(Thread& thread)
106
{
107
size_t l = left;
108
size_t r = right;
109
if (l < r)
110
{
111
l = left++;
112
if (l >= r)
113
return false;
114
}
115
else
116
return false;
117
118
if (!tasks[l].try_steal(thread.tasks.tasks[thread.tasks.right]))
119
return false;
120
121
thread.tasks.right++;
122
return true;
123
}
124
125
/* we steal from the left */
126
size_t TaskScheduler::TaskQueue::getTaskSizeAtLeft()
127
{
128
if (left >= right) return 0;
129
return tasks[left].N;
130
}
131
132
void threadPoolFunction(std::pair<TaskScheduler::ThreadPool*,size_t>* pair)
133
{
134
TaskScheduler::ThreadPool* pool = pair->first;
135
size_t threadIndex = pair->second;
136
delete pair;
137
pool->thread_loop(threadIndex);
138
}
139
140
TaskScheduler::ThreadPool::ThreadPool(bool set_affinity)
141
: numThreads(0), numThreadsRunning(0), set_affinity(set_affinity), running(false) {}
142
143
dll_export void TaskScheduler::ThreadPool::startThreads()
144
{
145
if (running) return;
146
setNumThreads(numThreads,true);
147
}
148
149
void TaskScheduler::ThreadPool::setNumThreads(size_t newNumThreads, bool startThreads)
150
{
151
Lock<MutexSys> lock(g_mutex);
152
assert(newNumThreads);
153
if (newNumThreads == std::numeric_limits<size_t>::max())
154
newNumThreads = (size_t) getNumberOfLogicalThreads();
155
156
numThreads = newNumThreads;
157
if (!startThreads && !running) return;
158
running = true;
159
size_t numThreadsActive = numThreadsRunning;
160
161
mutex.lock();
162
numThreadsRunning = newNumThreads;
163
mutex.unlock();
164
condition.notify_all();
165
166
/* start new threads */
167
for (size_t t=numThreadsActive; t<numThreads; t++)
168
{
169
if (t == 0) continue;
170
auto pair = new std::pair<TaskScheduler::ThreadPool*,size_t>(this,t);
171
threads.push_back(createThread((thread_func)threadPoolFunction,pair,4*1024*1024,set_affinity ? t : -1));
172
}
173
174
/* stop some threads if we reduce the number of threads */
175
for (ssize_t t=numThreadsActive-1; t>=ssize_t(numThreadsRunning); t--) {
176
if (t == 0) continue;
177
embree::join(threads.back());
178
threads.pop_back();
179
}
180
}
181
182
TaskScheduler::ThreadPool::~ThreadPool()
183
{
184
/* leave all taskschedulers */
185
mutex.lock();
186
numThreadsRunning = 0;
187
mutex.unlock();
188
condition.notify_all();
189
190
/* wait for threads to terminate */
191
for (size_t i=0; i<threads.size(); i++)
192
embree::join(threads[i]);
193
}
194
195
dll_export void TaskScheduler::ThreadPool::add(const Ref<TaskScheduler>& scheduler)
196
{
197
mutex.lock();
198
schedulers.push_back(scheduler);
199
mutex.unlock();
200
condition.notify_all();
201
}
202
203
dll_export void TaskScheduler::ThreadPool::remove(const Ref<TaskScheduler>& scheduler)
204
{
205
Lock<MutexSys> lock(mutex);
206
for (std::list<Ref<TaskScheduler> >::iterator it = schedulers.begin(); it != schedulers.end(); it++) {
207
if (scheduler == *it) {
208
schedulers.erase(it);
209
return;
210
}
211
}
212
}
213
214
void TaskScheduler::ThreadPool::thread_loop(size_t globalThreadIndex)
215
{
216
while (globalThreadIndex < numThreadsRunning)
217
{
218
Ref<TaskScheduler> scheduler = NULL;
219
ssize_t threadIndex = -1;
220
{
221
Lock<MutexSys> lock(mutex);
222
condition.wait(mutex, [&] () { return globalThreadIndex >= numThreadsRunning || !schedulers.empty(); });
223
if (globalThreadIndex >= numThreadsRunning) break;
224
scheduler = schedulers.front();
225
threadIndex = scheduler->allocThreadIndex();
226
}
227
scheduler->thread_loop(threadIndex);
228
}
229
}
230
231
TaskScheduler::TaskScheduler()
232
: threadCounter(0), anyTasksRunning(0), hasRootTask(false)
233
{
234
assert(threadPool);
235
threadLocal.resize(2 * TaskScheduler::threadCount()); // FIXME: this has to be 2x as in the compatibility join mode with rtcCommitScene the worker threads also join. When disallowing rtcCommitScene to join a build we can remove the 2x.
236
for (size_t i=0; i<threadLocal.size(); i++)
237
threadLocal[i].store(nullptr);
238
}
239
240
TaskScheduler::~TaskScheduler()
241
{
242
assert(threadCounter == 0);
243
}
244
245
dll_export size_t TaskScheduler::threadID()
246
{
247
Thread* thread = TaskScheduler::thread();
248
if (thread) return thread->threadIndex;
249
else return 0;
250
}
251
252
dll_export size_t TaskScheduler::threadIndex()
253
{
254
Thread* thread = TaskScheduler::thread();
255
if (thread) return thread->threadIndex;
256
else return 0;
257
}
258
259
dll_export size_t TaskScheduler::threadCount() {
260
return threadPool->size();
261
}
262
263
dll_export TaskScheduler* TaskScheduler::instance()
264
{
265
if (g_instance == NULL) {
266
Lock<MutexSys> lock(g_mutex);
267
g_instance = new TaskScheduler;
268
g_instance_vector.push_back(g_instance);
269
}
270
return g_instance;
271
}
272
273
void TaskScheduler::create(size_t numThreads, bool set_affinity, bool start_threads)
274
{
275
if (!threadPool) threadPool = new TaskScheduler::ThreadPool(set_affinity);
276
threadPool->setNumThreads(numThreads,start_threads);
277
}
278
279
void TaskScheduler::destroy() {
280
delete threadPool; threadPool = nullptr;
281
}
282
283
dll_export ssize_t TaskScheduler::allocThreadIndex()
284
{
285
size_t threadIndex = threadCounter++;
286
assert(threadIndex < threadLocal.size());
287
return threadIndex;
288
}
289
290
void TaskScheduler::join()
291
{
292
mutex.lock();
293
size_t threadIndex = allocThreadIndex();
294
condition.wait(mutex, [&] () { return hasRootTask.load(); });
295
mutex.unlock();
296
thread_loop(threadIndex);
297
}
298
299
void TaskScheduler::reset() {
300
hasRootTask = false;
301
}
302
303
void TaskScheduler::wait_for_threads(size_t threadCount)
304
{
305
while (threadCounter < threadCount-1)
306
pause_cpu();
307
}
308
309
dll_export TaskScheduler::Thread* TaskScheduler::thread() {
310
return thread_local_thread;
311
}
312
313
dll_export TaskScheduler::Thread* TaskScheduler::swapThread(Thread* thread)
314
{
315
Thread* old = thread_local_thread;
316
thread_local_thread = thread;
317
return old;
318
}
319
320
dll_export void TaskScheduler::wait()
321
{
322
Thread* thread = TaskScheduler::thread();
323
if (thread == nullptr)
324
return;
325
while (thread->tasks.execute_local_internal(*thread,thread->task)) {};
326
}
327
328
void TaskScheduler::thread_loop(size_t threadIndex)
329
{
330
/* allocate thread structure */
331
std::unique_ptr<Thread> mthread(new Thread(threadIndex,this)); // too large for stack allocation
332
Thread& thread = *mthread;
333
threadLocal[threadIndex].store(&thread);
334
Thread* oldThread = swapThread(&thread);
335
336
/* main thread loop */
337
while (anyTasksRunning)
338
{
339
steal_loop(thread,
340
[&] () { return anyTasksRunning > 0; },
341
[&] () {
342
anyTasksRunning++;
343
while (thread.tasks.execute_local_internal(thread,nullptr));
344
anyTasksRunning--;
345
});
346
}
347
threadLocal[threadIndex].store(nullptr);
348
swapThread(oldThread);
349
350
/* wait for all threads to terminate */
351
threadCounter--;
352
#if defined(__WIN32__)
353
size_t loopIndex = 1;
354
#endif
355
#define LOOP_YIELD_THRESHOLD (4096)
356
while (threadCounter > 0) {
357
#if defined(__WIN32__)
358
if ((loopIndex % LOOP_YIELD_THRESHOLD) == 0)
359
yield();
360
else
361
_mm_pause();
362
loopIndex++;
363
#else
364
yield();
365
#endif
366
}
367
}
368
369
bool TaskScheduler::steal_from_other_threads(Thread& thread)
370
{
371
const size_t threadIndex = thread.threadIndex;
372
const size_t threadCount = this->threadCounter;
373
374
for (size_t i=1; i<threadCount; i++)
375
{
376
pause_cpu(32);
377
size_t otherThreadIndex = threadIndex+i;
378
if (otherThreadIndex >= threadCount) otherThreadIndex -= threadCount;
379
380
Thread* othread = threadLocal[otherThreadIndex].load();
381
if (!othread)
382
continue;
383
384
if (othread->tasks.steal(thread))
385
return true;
386
}
387
388
return false;
389
}
390
391
dll_export void TaskScheduler::startThreads() {
392
threadPool->startThreads();
393
}
394
395
dll_export void TaskScheduler::addScheduler(const Ref<TaskScheduler>& scheduler) {
396
threadPool->add(scheduler);
397
}
398
399
dll_export void TaskScheduler::removeScheduler(const Ref<TaskScheduler>& scheduler) {
400
threadPool->remove(scheduler);
401
}
402
403
RTC_NAMESPACE_END
404
}
405
406