Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
godotengine
GitHub Repository: godotengine/godot
Path: blob/master/thirdparty/jolt_physics/Jolt/Core/JobSystemThreadPool.cpp
9906 views
1
// Jolt Physics Library (https://github.com/jrouwe/JoltPhysics)
2
// SPDX-FileCopyrightText: 2021 Jorrit Rouwe
3
// SPDX-License-Identifier: MIT
4
5
#include <Jolt/Jolt.h>
6
7
#include <Jolt/Core/JobSystemThreadPool.h>
8
#include <Jolt/Core/Profiler.h>
9
#include <Jolt/Core/FPException.h>
10
11
#ifdef JPH_PLATFORM_WINDOWS
12
JPH_SUPPRESS_WARNING_PUSH
13
JPH_MSVC_SUPPRESS_WARNING(5039) // winbase.h(13179): warning C5039: 'TpSetCallbackCleanupGroup': pointer or reference to potentially throwing function passed to 'extern "C"' function under -EHc. Undefined behavior may occur if this function throws an exception.
14
#ifndef WIN32_LEAN_AND_MEAN
15
#define WIN32_LEAN_AND_MEAN
16
#endif
17
#ifndef JPH_COMPILER_MINGW
18
#include <Windows.h>
19
#else
20
#include <windows.h>
21
#endif
22
23
JPH_SUPPRESS_WARNING_POP
24
#endif
25
#ifdef JPH_PLATFORM_LINUX
26
#include <sys/prctl.h>
27
#endif
28
29
JPH_NAMESPACE_BEGIN
30
31
void JobSystemThreadPool::Init(uint inMaxJobs, uint inMaxBarriers, int inNumThreads)
32
{
33
JobSystemWithBarrier::Init(inMaxBarriers);
34
35
// Init freelist of jobs
36
mJobs.Init(inMaxJobs, inMaxJobs);
37
38
// Init queue
39
for (atomic<Job *> &j : mQueue)
40
j = nullptr;
41
42
// Start the worker threads
43
StartThreads(inNumThreads);
44
}
45
46
JobSystemThreadPool::JobSystemThreadPool(uint inMaxJobs, uint inMaxBarriers, int inNumThreads)
47
{
48
Init(inMaxJobs, inMaxBarriers, inNumThreads);
49
}
50
51
void JobSystemThreadPool::StartThreads([[maybe_unused]] int inNumThreads)
52
{
53
#if !defined(JPH_CPU_WASM) || defined(__EMSCRIPTEN_PTHREADS__) // If we're running without threads support we cannot create threads and we ignore the inNumThreads parameter
54
// Auto detect number of threads
55
if (inNumThreads < 0)
56
inNumThreads = thread::hardware_concurrency() - 1;
57
58
// If no threads are requested we're done
59
if (inNumThreads == 0)
60
return;
61
62
// Don't quit the threads
63
mQuit = false;
64
65
// Allocate heads
66
mHeads = reinterpret_cast<atomic<uint> *>(Allocate(sizeof(atomic<uint>) * inNumThreads));
67
for (int i = 0; i < inNumThreads; ++i)
68
mHeads[i] = 0;
69
70
// Start running threads
71
JPH_ASSERT(mThreads.empty());
72
mThreads.reserve(inNumThreads);
73
for (int i = 0; i < inNumThreads; ++i)
74
mThreads.emplace_back([this, i] { ThreadMain(i); });
75
#endif
76
}
77
78
JobSystemThreadPool::~JobSystemThreadPool()
79
{
80
// Stop all worker threads
81
StopThreads();
82
}
83
84
void JobSystemThreadPool::StopThreads()
85
{
86
if (mThreads.empty())
87
return;
88
89
// Signal threads that we want to stop and wake them up
90
mQuit = true;
91
mSemaphore.Release((uint)mThreads.size());
92
93
// Wait for all threads to finish
94
for (thread &t : mThreads)
95
if (t.joinable())
96
t.join();
97
98
// Delete all threads
99
mThreads.clear();
100
101
// Ensure that there are no lingering jobs in the queue
102
for (uint head = 0; head != mTail; ++head)
103
{
104
// Fetch job
105
Job *job_ptr = mQueue[head & (cQueueLength - 1)].exchange(nullptr);
106
if (job_ptr != nullptr)
107
{
108
// And execute it
109
job_ptr->Execute();
110
job_ptr->Release();
111
}
112
}
113
114
// Destroy heads and reset tail
115
Free(mHeads);
116
mHeads = nullptr;
117
mTail = 0;
118
}
119
120
JobHandle JobSystemThreadPool::CreateJob(const char *inJobName, ColorArg inColor, const JobFunction &inJobFunction, uint32 inNumDependencies)
121
{
122
JPH_PROFILE_FUNCTION();
123
124
// Loop until we can get a job from the free list
125
uint32 index;
126
for (;;)
127
{
128
index = mJobs.ConstructObject(inJobName, inColor, this, inJobFunction, inNumDependencies);
129
if (index != AvailableJobs::cInvalidObjectIndex)
130
break;
131
JPH_ASSERT(false, "No jobs available!");
132
std::this_thread::sleep_for(std::chrono::microseconds(100));
133
}
134
Job *job = &mJobs.Get(index);
135
136
// Construct handle to keep a reference, the job is queued below and may immediately complete
137
JobHandle handle(job);
138
139
// If there are no dependencies, queue the job now
140
if (inNumDependencies == 0)
141
QueueJob(job);
142
143
// Return the handle
144
return handle;
145
}
146
147
void JobSystemThreadPool::FreeJob(Job *inJob)
148
{
149
mJobs.DestructObject(inJob);
150
}
151
152
uint JobSystemThreadPool::GetHead() const
153
{
154
// Find the minimal value across all threads
155
uint head = mTail;
156
for (size_t i = 0; i < mThreads.size(); ++i)
157
head = min(head, mHeads[i].load());
158
return head;
159
}
160
161
void JobSystemThreadPool::QueueJobInternal(Job *inJob)
162
{
163
// Add reference to job because we're adding the job to the queue
164
inJob->AddRef();
165
166
// Need to read head first because otherwise the tail can already have passed the head
167
// We read the head outside of the loop since it involves iterating over all threads and we only need to update
168
// it if there's not enough space in the queue.
169
uint head = GetHead();
170
171
for (;;)
172
{
173
// Check if there's space in the queue
174
uint old_value = mTail;
175
if (old_value - head >= cQueueLength)
176
{
177
// We calculated the head outside of the loop, update head (and we also need to update tail to prevent it from passing head)
178
head = GetHead();
179
old_value = mTail;
180
181
// Second check if there's space in the queue
182
if (old_value - head >= cQueueLength)
183
{
184
// Wake up all threads in order to ensure that they can clear any nullptrs they may not have processed yet
185
mSemaphore.Release((uint)mThreads.size());
186
187
// Sleep a little (we have to wait for other threads to update their head pointer in order for us to be able to continue)
188
std::this_thread::sleep_for(std::chrono::microseconds(100));
189
continue;
190
}
191
}
192
193
// Write the job pointer if the slot is empty
194
Job *expected_job = nullptr;
195
bool success = mQueue[old_value & (cQueueLength - 1)].compare_exchange_strong(expected_job, inJob);
196
197
// Regardless of who wrote the slot, we will update the tail (if the successful thread got scheduled out
198
// after writing the pointer we still want to be able to continue)
199
mTail.compare_exchange_strong(old_value, old_value + 1);
200
201
// If we successfully added our job we're done
202
if (success)
203
break;
204
}
205
}
206
207
void JobSystemThreadPool::QueueJob(Job *inJob)
208
{
209
JPH_PROFILE_FUNCTION();
210
211
// If we have no worker threads, we can't queue the job either. We assume in this case that the job will be added to a barrier and that the barrier will execute the job when it's Wait() function is called.
212
if (mThreads.empty())
213
return;
214
215
// Queue the job
216
QueueJobInternal(inJob);
217
218
// Wake up thread
219
mSemaphore.Release();
220
}
221
222
void JobSystemThreadPool::QueueJobs(Job **inJobs, uint inNumJobs)
223
{
224
JPH_PROFILE_FUNCTION();
225
226
JPH_ASSERT(inNumJobs > 0);
227
228
// If we have no worker threads, we can't queue the job either. We assume in this case that the job will be added to a barrier and that the barrier will execute the job when it's Wait() function is called.
229
if (mThreads.empty())
230
return;
231
232
// Queue all jobs
233
for (Job **job = inJobs, **job_end = inJobs + inNumJobs; job < job_end; ++job)
234
QueueJobInternal(*job);
235
236
// Wake up threads
237
mSemaphore.Release(min(inNumJobs, (uint)mThreads.size()));
238
}
239
240
#if defined(JPH_PLATFORM_WINDOWS)
241
242
#if !defined(JPH_COMPILER_MINGW) // MinGW doesn't support __try/__except)
243
// Sets the current thread name in MSVC debugger
244
static void RaiseThreadNameException(const char *inName)
245
{
246
#pragma pack(push, 8)
247
248
struct THREADNAME_INFO
249
{
250
DWORD dwType; // Must be 0x1000.
251
LPCSTR szName; // Pointer to name (in user addr space).
252
DWORD dwThreadID; // Thread ID (-1=caller thread).
253
DWORD dwFlags; // Reserved for future use, must be zero.
254
};
255
256
#pragma pack(pop)
257
258
THREADNAME_INFO info;
259
info.dwType = 0x1000;
260
info.szName = inName;
261
info.dwThreadID = (DWORD)-1;
262
info.dwFlags = 0;
263
264
__try
265
{
266
RaiseException(0x406D1388, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR *)&info);
267
}
268
__except(EXCEPTION_EXECUTE_HANDLER)
269
{
270
}
271
}
272
#endif // !JPH_COMPILER_MINGW
273
274
static void SetThreadName(const char* inName)
275
{
276
JPH_SUPPRESS_WARNING_PUSH
277
278
// Suppress casting warning, it's fine here as GetProcAddress doesn't really return a FARPROC
279
JPH_CLANG_SUPPRESS_WARNING("-Wcast-function-type") // error : cast from 'FARPROC' (aka 'long long (*)()') to 'SetThreadDescriptionFunc' (aka 'long (*)(void *, const wchar_t *)') converts to incompatible function type
280
JPH_CLANG_SUPPRESS_WARNING("-Wcast-function-type-strict") // error : cast from 'FARPROC' (aka 'long long (*)()') to 'SetThreadDescriptionFunc' (aka 'long (*)(void *, const wchar_t *)') converts to incompatible function type
281
JPH_MSVC_SUPPRESS_WARNING(4191) // reinterpret_cast' : unsafe conversion from 'FARPROC' to 'SetThreadDescriptionFunc'. Calling this function through the result pointer may cause your program to fail
282
283
using SetThreadDescriptionFunc = HRESULT(WINAPI*)(HANDLE hThread, PCWSTR lpThreadDescription);
284
static SetThreadDescriptionFunc SetThreadDescription = reinterpret_cast<SetThreadDescriptionFunc>(GetProcAddress(GetModuleHandleW(L"Kernel32.dll"), "SetThreadDescription"));
285
286
JPH_SUPPRESS_WARNING_POP
287
288
if (SetThreadDescription)
289
{
290
wchar_t name_buffer[64] = { 0 };
291
if (MultiByteToWideChar(CP_UTF8, 0, inName, -1, name_buffer, sizeof(name_buffer) / sizeof(wchar_t) - 1) == 0)
292
return;
293
294
SetThreadDescription(GetCurrentThread(), name_buffer);
295
}
296
#if !defined(JPH_COMPILER_MINGW)
297
else if (IsDebuggerPresent())
298
RaiseThreadNameException(inName);
299
#endif // !JPH_COMPILER_MINGW
300
}
301
#elif defined(JPH_PLATFORM_LINUX)
302
static void SetThreadName(const char *inName)
303
{
304
JPH_ASSERT(strlen(inName) < 16); // String will be truncated if it is longer
305
prctl(PR_SET_NAME, inName, 0, 0, 0);
306
}
307
#endif // JPH_PLATFORM_LINUX
308
309
void JobSystemThreadPool::ThreadMain(int inThreadIndex)
310
{
311
// Name the thread
312
char name[64];
313
snprintf(name, sizeof(name), "Worker %d", int(inThreadIndex + 1));
314
315
#if defined(JPH_PLATFORM_WINDOWS) || defined(JPH_PLATFORM_LINUX)
316
SetThreadName(name);
317
#endif // JPH_PLATFORM_WINDOWS && !JPH_COMPILER_MINGW
318
319
// Enable floating point exceptions
320
FPExceptionsEnable enable_exceptions;
321
JPH_UNUSED(enable_exceptions);
322
323
JPH_PROFILE_THREAD_START(name);
324
325
// Call the thread init function
326
mThreadInitFunction(inThreadIndex);
327
328
atomic<uint> &head = mHeads[inThreadIndex];
329
330
while (!mQuit)
331
{
332
// Wait for jobs
333
mSemaphore.Acquire();
334
335
{
336
JPH_PROFILE("Executing Jobs");
337
338
// Loop over the queue
339
while (head != mTail)
340
{
341
// Exchange any job pointer we find with a nullptr
342
atomic<Job *> &job = mQueue[head & (cQueueLength - 1)];
343
if (job.load() != nullptr)
344
{
345
Job *job_ptr = job.exchange(nullptr);
346
if (job_ptr != nullptr)
347
{
348
// And execute it
349
job_ptr->Execute();
350
job_ptr->Release();
351
}
352
}
353
head++;
354
}
355
}
356
}
357
358
// Call the thread exit function
359
mThreadExitFunction(inThreadIndex);
360
361
JPH_PROFILE_THREAD_END();
362
}
363
364
JPH_NAMESPACE_END
365
366