Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
godotengine
GitHub Repository: godotengine/godot
Path: blob/master/thirdparty/jolt_physics/Jolt/Core/JobSystem.h
9906 views
1
// Jolt Physics Library (https://github.com/jrouwe/JoltPhysics)
2
// SPDX-FileCopyrightText: 2021 Jorrit Rouwe
3
// SPDX-License-Identifier: MIT
4
5
#pragma once
6
7
#include <Jolt/Core/Reference.h>
8
#include <Jolt/Core/Color.h>
9
#include <Jolt/Core/Profiler.h>
10
#include <Jolt/Core/NonCopyable.h>
11
#include <Jolt/Core/StaticArray.h>
12
#include <Jolt/Core/Atomics.h>
13
14
JPH_NAMESPACE_BEGIN
15
16
/// A class that allows units of work (Jobs) to be scheduled across multiple threads.
17
/// It allows dependencies between the jobs so that the jobs form a graph.
18
///
19
/// The pattern for using this class is:
20
///
21
/// // Create job system
22
/// JobSystem *job_system = new JobSystemThreadPool(...);
23
///
24
/// // Create some jobs
25
/// JobHandle second_job = job_system->CreateJob("SecondJob", Color::sRed, []() { ... }, 1); // Create a job with 1 dependency
26
/// JobHandle first_job = job_system->CreateJob("FirstJob", Color::sGreen, [second_job]() { ....; second_job.RemoveDependency(); }, 0); // Job can start immediately, will start second job when it's done
27
/// JobHandle third_job = job_system->CreateJob("ThirdJob", Color::sBlue, []() { ... }, 0); // This job can run immediately as well and can run in parallel to job 1 and 2
28
///
29
/// // Add the jobs to the barrier so that we can execute them while we're waiting
30
/// Barrier *barrier = job_system->CreateBarrier();
31
/// barrier->AddJob(first_job);
32
/// barrier->AddJob(second_job);
33
/// barrier->AddJob(third_job);
34
/// job_system->WaitForJobs(barrier);
35
///
36
/// // Clean up
37
/// job_system->DestroyBarrier(barrier);
38
/// delete job_system;
39
///
40
/// Jobs are guaranteed to be started in the order that their dependency counter becomes zero (in case they're scheduled on a background thread)
41
/// or in the order they're added to the barrier (when dependency count is zero and when executing on the thread that calls WaitForJobs).
42
///
43
/// If you want to implement your own job system, inherit from JobSystem and implement:
44
///
45
/// * JobSystem::GetMaxConcurrency - This should return the maximum number of jobs that can run in parallel.
46
/// * JobSystem::CreateJob - This should create a Job object and return it to the caller.
47
/// * JobSystem::FreeJob - This should free the memory associated with the job object. It is called by the Job destructor when it is Release()-ed for the last time.
48
/// * JobSystem::QueueJob/QueueJobs - These should store the job pointer in an internal queue to run immediately (dependencies are tracked internally, this function is called when the job can run).
49
/// The Job objects are reference counted and are guaranteed to stay alive during the QueueJob(s) call. If you store the job in your own data structure you need to call AddRef() to take a reference.
50
/// After the job has been executed you need to call Release() to release the reference. Make sure you no longer dereference the job pointer after calling Release().
51
///
52
/// JobSystem::Barrier is used to track the completion of a set of jobs. Jobs will be created by other jobs and added to the barrier while it is being waited on. This means that you cannot
53
/// create a dependency graph beforehand as the graph changes while jobs are running. Implement the following functions:
54
///
55
/// * Barrier::AddJob/AddJobs - Add a job to the barrier, any call to WaitForJobs will now also wait for this job to complete.
56
/// If you store the job in a data structure in the Barrier you need to call AddRef() on the job to keep it alive and Release() after you're done with it.
57
/// * Barrier::OnJobFinished - This function is called when a job has finished executing, you can use this to track completion and remove the job from the list of jobs to wait on.
58
///
59
/// The functions on JobSystem that need to be implemented to support barriers are:
60
///
61
/// * JobSystem::CreateBarrier - Create a new barrier.
62
/// * JobSystem::DestroyBarrier - Destroy a barrier.
63
/// * JobSystem::WaitForJobs - This is the main function that is used to wait for all jobs that have been added to a Barrier. WaitForJobs can execute jobs that have
64
/// been added to the barrier while waiting. It is not wise to execute other jobs that touch physics structures as this can cause race conditions and deadlocks. Please keep in mind that the barrier is
65
/// only intended to wait on the completion of the Jolt jobs added to it, if you scheduled any jobs in your engine's job system to execute the Jolt jobs as part of QueueJob/QueueJobs, you might still need
66
/// to wait for these in this function after the barrier is finished waiting.
67
///
68
/// An example implementation is JobSystemThreadPool. If you don't want to write the Barrier class you can also inherit from JobSystemWithBarrier.
69
class JPH_EXPORT JobSystem : public NonCopyable
70
{
71
protected:
72
class Job;
73
74
public:
75
JPH_OVERRIDE_NEW_DELETE
76
77
/// A job handle contains a reference to a job. The job will be deleted as soon as there are no JobHandles.
78
/// referring to the job and when it is not in the job queue / being processed.
79
class JobHandle : private Ref<Job>
80
{
81
public:
82
/// Constructor
83
inline JobHandle() = default;
84
inline JobHandle(const JobHandle &inHandle) = default;
85
inline JobHandle(JobHandle &&inHandle) noexcept : Ref<Job>(std::move(inHandle)) { }
86
87
/// Constructor, only to be used by JobSystem
88
inline explicit JobHandle(Job *inJob) : Ref<Job>(inJob) { }
89
90
/// Assignment
91
inline JobHandle & operator = (const JobHandle &inHandle) = default;
92
inline JobHandle & operator = (JobHandle &&inHandle) noexcept = default;
93
94
/// Check if this handle contains a job
95
inline bool IsValid() const { return GetPtr() != nullptr; }
96
97
/// Check if this job has finished executing
98
inline bool IsDone() const { return GetPtr() != nullptr && GetPtr()->IsDone(); }
99
100
/// Add to the dependency counter.
101
inline void AddDependency(int inCount = 1) const { GetPtr()->AddDependency(inCount); }
102
103
/// Remove from the dependency counter. Job will start whenever the dependency counter reaches zero
104
/// and if it does it is no longer valid to call the AddDependency/RemoveDependency functions.
105
inline void RemoveDependency(int inCount = 1) const { GetPtr()->RemoveDependencyAndQueue(inCount); }
106
107
/// Remove a dependency from a batch of jobs at once, this can be more efficient than removing them one by one as it requires less locking
108
static inline void sRemoveDependencies(const JobHandle *inHandles, uint inNumHandles, int inCount = 1);
109
110
/// Helper function to remove dependencies on a static array of job handles
111
template <uint N>
112
static inline void sRemoveDependencies(StaticArray<JobHandle, N> &inHandles, int inCount = 1)
113
{
114
sRemoveDependencies(inHandles.data(), inHandles.size(), inCount);
115
}
116
117
/// Inherit the GetPtr function, only to be used by the JobSystem
118
using Ref<Job>::GetPtr;
119
};
120
121
/// A job barrier keeps track of a number of jobs and allows waiting until they are all completed.
122
class Barrier : public NonCopyable
123
{
124
public:
125
JPH_OVERRIDE_NEW_DELETE
126
127
/// Add a job to this barrier
128
/// Note that jobs can keep being added to the barrier while waiting for the barrier
129
virtual void AddJob(const JobHandle &inJob) = 0;
130
131
/// Add multiple jobs to this barrier
132
/// Note that jobs can keep being added to the barrier while waiting for the barrier
133
virtual void AddJobs(const JobHandle *inHandles, uint inNumHandles) = 0;
134
135
protected:
136
/// Job needs to be able to call OnJobFinished
137
friend class Job;
138
139
/// Destructor, you should call JobSystem::DestroyBarrier instead of destructing this object directly
140
virtual ~Barrier() = default;
141
142
/// Called by a Job to mark that it is finished
143
virtual void OnJobFinished(Job *inJob) = 0;
144
};
145
146
/// Main function of the job
147
using JobFunction = function<void()>;
148
149
/// Destructor
150
virtual ~JobSystem() = default;
151
152
/// Get maximum number of concurrently executing jobs
153
virtual int GetMaxConcurrency() const = 0;
154
155
/// Create a new job, the job is started immediately if inNumDependencies == 0 otherwise it starts when
156
/// RemoveDependency causes the dependency counter to reach 0.
157
virtual JobHandle CreateJob(const char *inName, ColorArg inColor, const JobFunction &inJobFunction, uint32 inNumDependencies = 0) = 0;
158
159
/// Create a new barrier, used to wait on jobs
160
virtual Barrier * CreateBarrier() = 0;
161
162
/// Destroy a barrier when it is no longer used. The barrier should be empty at this point.
163
virtual void DestroyBarrier(Barrier *inBarrier) = 0;
164
165
/// Wait for a set of jobs to be finished, note that only 1 thread can be waiting on a barrier at a time
166
virtual void WaitForJobs(Barrier *inBarrier) = 0;
167
168
protected:
169
/// A class that contains information for a single unit of work
170
class Job
171
{
172
public:
173
JPH_OVERRIDE_NEW_DELETE
174
175
/// Constructor
176
Job([[maybe_unused]] const char *inJobName, [[maybe_unused]] ColorArg inColor, JobSystem *inJobSystem, const JobFunction &inJobFunction, uint32 inNumDependencies) :
177
#if defined(JPH_EXTERNAL_PROFILE) || defined(JPH_PROFILE_ENABLED)
178
mJobName(inJobName),
179
mColor(inColor),
180
#endif // defined(JPH_EXTERNAL_PROFILE) || defined(JPH_PROFILE_ENABLED)
181
mJobSystem(inJobSystem),
182
mJobFunction(inJobFunction),
183
mNumDependencies(inNumDependencies)
184
{
185
}
186
187
/// Get the jobs system to which this job belongs
188
inline JobSystem * GetJobSystem() { return mJobSystem; }
189
190
/// Add or release a reference to this object
191
inline void AddRef()
192
{
193
// Adding a reference can use relaxed memory ordering
194
mReferenceCount.fetch_add(1, memory_order_relaxed);
195
}
196
inline void Release()
197
{
198
#ifndef JPH_TSAN_ENABLED
199
// Releasing a reference must use release semantics...
200
if (mReferenceCount.fetch_sub(1, memory_order_release) == 1)
201
{
202
// ... so that we can use acquire to ensure that we see any updates from other threads that released a ref before freeing the job
203
atomic_thread_fence(memory_order_acquire);
204
mJobSystem->FreeJob(this);
205
}
206
#else
207
// But under TSAN, we cannot use atomic_thread_fence, so we use an acq_rel operation unconditionally instead
208
if (mReferenceCount.fetch_sub(1, memory_order_acq_rel) == 1)
209
mJobSystem->FreeJob(this);
210
#endif
211
}
212
213
/// Add to the dependency counter.
214
inline void AddDependency(int inCount);
215
216
/// Remove from the dependency counter. Returns true whenever the dependency counter reaches zero
217
/// and if it does it is no longer valid to call the AddDependency/RemoveDependency functions.
218
inline bool RemoveDependency(int inCount);
219
220
/// Remove from the dependency counter. Job will be queued whenever the dependency counter reaches zero
221
/// and if it does it is no longer valid to call the AddDependency/RemoveDependency functions.
222
inline void RemoveDependencyAndQueue(int inCount);
223
224
/// Set the job barrier that this job belongs to and returns false if this was not possible because the job already finished
225
inline bool SetBarrier(Barrier *inBarrier)
226
{
227
intptr_t barrier = 0;
228
if (mBarrier.compare_exchange_strong(barrier, reinterpret_cast<intptr_t>(inBarrier), memory_order_relaxed))
229
return true;
230
JPH_ASSERT(barrier == cBarrierDoneState, "A job can only belong to 1 barrier");
231
return false;
232
}
233
234
/// Run the job function, returns the number of dependencies that this job still has or cExecutingState or cDoneState
235
inline uint32 Execute()
236
{
237
// Transition job to executing state
238
uint32 state = 0; // We can only start running with a dependency counter of 0
239
if (!mNumDependencies.compare_exchange_strong(state, cExecutingState, memory_order_acquire))
240
return state; // state is updated by compare_exchange_strong to the current value
241
242
// Run the job function
243
{
244
JPH_PROFILE(mJobName, mColor.GetUInt32());
245
mJobFunction();
246
}
247
248
// Fetch the barrier pointer and exchange it for the done state, so we're sure that no barrier gets set after we want to call the callback
249
intptr_t barrier = mBarrier.load(memory_order_relaxed);
250
for (;;)
251
{
252
if (mBarrier.compare_exchange_weak(barrier, cBarrierDoneState, memory_order_relaxed))
253
break;
254
}
255
JPH_ASSERT(barrier != cBarrierDoneState);
256
257
// Mark job as done
258
state = cExecutingState;
259
mNumDependencies.compare_exchange_strong(state, cDoneState, memory_order_relaxed);
260
JPH_ASSERT(state == cExecutingState);
261
262
// Notify the barrier after we've changed the job to the done state so that any thread reading the state after receiving the callback will see that the job has finished
263
if (barrier != 0)
264
reinterpret_cast<Barrier *>(barrier)->OnJobFinished(this);
265
266
return cDoneState;
267
}
268
269
/// Test if the job can be executed
270
inline bool CanBeExecuted() const { return mNumDependencies.load(memory_order_relaxed) == 0; }
271
272
/// Test if the job finished executing
273
inline bool IsDone() const { return mNumDependencies.load(memory_order_relaxed) == cDoneState; }
274
275
#if defined(JPH_EXTERNAL_PROFILE) || defined(JPH_PROFILE_ENABLED)
276
/// Get the name of the job
277
const char * GetName() const { return mJobName; }
278
#endif // defined(JPH_EXTERNAL_PROFILE) || defined(JPH_PROFILE_ENABLED)
279
280
static constexpr uint32 cExecutingState = 0xe0e0e0e0; ///< Value of mNumDependencies when job is executing
281
static constexpr uint32 cDoneState = 0xd0d0d0d0; ///< Value of mNumDependencies when job is done executing
282
283
static constexpr intptr_t cBarrierDoneState = ~intptr_t(0); ///< Value to use when the barrier has been triggered
284
285
private:
286
#if defined(JPH_EXTERNAL_PROFILE) || defined(JPH_PROFILE_ENABLED)
287
const char * mJobName; ///< Name of the job
288
Color mColor; ///< Color of the job in the profiler
289
#endif // defined(JPH_EXTERNAL_PROFILE) || defined(JPH_PROFILE_ENABLED)
290
JobSystem * mJobSystem; ///< The job system we belong to
291
atomic<intptr_t> mBarrier = 0; ///< Barrier that this job is associated with (is a Barrier pointer)
292
JobFunction mJobFunction; ///< Main job function
293
atomic<uint32> mReferenceCount = 0; ///< Amount of JobHandles pointing to this job
294
atomic<uint32> mNumDependencies; ///< Amount of jobs that need to complete before this job can run
295
};
296
297
/// Adds a job to the job queue
298
virtual void QueueJob(Job *inJob) = 0;
299
300
/// Adds a number of jobs at once to the job queue
301
virtual void QueueJobs(Job **inJobs, uint inNumJobs) = 0;
302
303
/// Frees a job
304
virtual void FreeJob(Job *inJob) = 0;
305
};
306
307
using JobHandle = JobSystem::JobHandle;
308
309
JPH_NAMESPACE_END
310
311
#include "JobSystem.inl"
312
313