Path: blob/master/thirdparty/jolt_physics/Jolt/Core/JobSystem.h
9906 views
// Jolt Physics Library (https://github.com/jrouwe/JoltPhysics)1// SPDX-FileCopyrightText: 2021 Jorrit Rouwe2// SPDX-License-Identifier: MIT34#pragma once56#include <Jolt/Core/Reference.h>7#include <Jolt/Core/Color.h>8#include <Jolt/Core/Profiler.h>9#include <Jolt/Core/NonCopyable.h>10#include <Jolt/Core/StaticArray.h>11#include <Jolt/Core/Atomics.h>1213JPH_NAMESPACE_BEGIN1415/// A class that allows units of work (Jobs) to be scheduled across multiple threads.16/// It allows dependencies between the jobs so that the jobs form a graph.17///18/// The pattern for using this class is:19///20/// // Create job system21/// JobSystem *job_system = new JobSystemThreadPool(...);22///23/// // Create some jobs24/// JobHandle second_job = job_system->CreateJob("SecondJob", Color::sRed, []() { ... }, 1); // Create a job with 1 dependency25/// JobHandle first_job = job_system->CreateJob("FirstJob", Color::sGreen, [second_job]() { ....; second_job.RemoveDependency(); }, 0); // Job can start immediately, will start second job when it's done26/// JobHandle third_job = job_system->CreateJob("ThirdJob", Color::sBlue, []() { ... }, 0); // This job can run immediately as well and can run in parallel to job 1 and 227///28/// // Add the jobs to the barrier so that we can execute them while we're waiting29/// Barrier *barrier = job_system->CreateBarrier();30/// barrier->AddJob(first_job);31/// barrier->AddJob(second_job);32/// barrier->AddJob(third_job);33/// job_system->WaitForJobs(barrier);34///35/// // Clean up36/// job_system->DestroyBarrier(barrier);37/// delete job_system;38///39/// Jobs are guaranteed to be started in the order that their dependency counter becomes zero (in case they're scheduled on a background thread)40/// or in the order they're added to the barrier (when dependency count is zero and when executing on the thread that calls WaitForJobs).41///42/// If you want to implement your own job system, inherit from JobSystem and implement:43///44/// * JobSystem::GetMaxConcurrency - This should return the maximum number of jobs that can run in parallel.45/// * JobSystem::CreateJob - This should create a Job object and return it to the caller.46/// * JobSystem::FreeJob - This should free the memory associated with the job object. It is called by the Job destructor when it is Release()-ed for the last time.47/// * JobSystem::QueueJob/QueueJobs - These should store the job pointer in an internal queue to run immediately (dependencies are tracked internally, this function is called when the job can run).48/// The Job objects are reference counted and are guaranteed to stay alive during the QueueJob(s) call. If you store the job in your own data structure you need to call AddRef() to take a reference.49/// After the job has been executed you need to call Release() to release the reference. Make sure you no longer dereference the job pointer after calling Release().50///51/// JobSystem::Barrier is used to track the completion of a set of jobs. Jobs will be created by other jobs and added to the barrier while it is being waited on. This means that you cannot52/// create a dependency graph beforehand as the graph changes while jobs are running. Implement the following functions:53///54/// * Barrier::AddJob/AddJobs - Add a job to the barrier, any call to WaitForJobs will now also wait for this job to complete.55/// If you store the job in a data structure in the Barrier you need to call AddRef() on the job to keep it alive and Release() after you're done with it.56/// * Barrier::OnJobFinished - This function is called when a job has finished executing, you can use this to track completion and remove the job from the list of jobs to wait on.57///58/// The functions on JobSystem that need to be implemented to support barriers are:59///60/// * JobSystem::CreateBarrier - Create a new barrier.61/// * JobSystem::DestroyBarrier - Destroy a barrier.62/// * JobSystem::WaitForJobs - This is the main function that is used to wait for all jobs that have been added to a Barrier. WaitForJobs can execute jobs that have63/// been added to the barrier while waiting. It is not wise to execute other jobs that touch physics structures as this can cause race conditions and deadlocks. Please keep in mind that the barrier is64/// only intended to wait on the completion of the Jolt jobs added to it, if you scheduled any jobs in your engine's job system to execute the Jolt jobs as part of QueueJob/QueueJobs, you might still need65/// to wait for these in this function after the barrier is finished waiting.66///67/// An example implementation is JobSystemThreadPool. If you don't want to write the Barrier class you can also inherit from JobSystemWithBarrier.68class JPH_EXPORT JobSystem : public NonCopyable69{70protected:71class Job;7273public:74JPH_OVERRIDE_NEW_DELETE7576/// A job handle contains a reference to a job. The job will be deleted as soon as there are no JobHandles.77/// referring to the job and when it is not in the job queue / being processed.78class JobHandle : private Ref<Job>79{80public:81/// Constructor82inline JobHandle() = default;83inline JobHandle(const JobHandle &inHandle) = default;84inline JobHandle(JobHandle &&inHandle) noexcept : Ref<Job>(std::move(inHandle)) { }8586/// Constructor, only to be used by JobSystem87inline explicit JobHandle(Job *inJob) : Ref<Job>(inJob) { }8889/// Assignment90inline JobHandle & operator = (const JobHandle &inHandle) = default;91inline JobHandle & operator = (JobHandle &&inHandle) noexcept = default;9293/// Check if this handle contains a job94inline bool IsValid() const { return GetPtr() != nullptr; }9596/// Check if this job has finished executing97inline bool IsDone() const { return GetPtr() != nullptr && GetPtr()->IsDone(); }9899/// Add to the dependency counter.100inline void AddDependency(int inCount = 1) const { GetPtr()->AddDependency(inCount); }101102/// Remove from the dependency counter. Job will start whenever the dependency counter reaches zero103/// and if it does it is no longer valid to call the AddDependency/RemoveDependency functions.104inline void RemoveDependency(int inCount = 1) const { GetPtr()->RemoveDependencyAndQueue(inCount); }105106/// Remove a dependency from a batch of jobs at once, this can be more efficient than removing them one by one as it requires less locking107static inline void sRemoveDependencies(const JobHandle *inHandles, uint inNumHandles, int inCount = 1);108109/// Helper function to remove dependencies on a static array of job handles110template <uint N>111static inline void sRemoveDependencies(StaticArray<JobHandle, N> &inHandles, int inCount = 1)112{113sRemoveDependencies(inHandles.data(), inHandles.size(), inCount);114}115116/// Inherit the GetPtr function, only to be used by the JobSystem117using Ref<Job>::GetPtr;118};119120/// A job barrier keeps track of a number of jobs and allows waiting until they are all completed.121class Barrier : public NonCopyable122{123public:124JPH_OVERRIDE_NEW_DELETE125126/// Add a job to this barrier127/// Note that jobs can keep being added to the barrier while waiting for the barrier128virtual void AddJob(const JobHandle &inJob) = 0;129130/// Add multiple jobs to this barrier131/// Note that jobs can keep being added to the barrier while waiting for the barrier132virtual void AddJobs(const JobHandle *inHandles, uint inNumHandles) = 0;133134protected:135/// Job needs to be able to call OnJobFinished136friend class Job;137138/// Destructor, you should call JobSystem::DestroyBarrier instead of destructing this object directly139virtual ~Barrier() = default;140141/// Called by a Job to mark that it is finished142virtual void OnJobFinished(Job *inJob) = 0;143};144145/// Main function of the job146using JobFunction = function<void()>;147148/// Destructor149virtual ~JobSystem() = default;150151/// Get maximum number of concurrently executing jobs152virtual int GetMaxConcurrency() const = 0;153154/// Create a new job, the job is started immediately if inNumDependencies == 0 otherwise it starts when155/// RemoveDependency causes the dependency counter to reach 0.156virtual JobHandle CreateJob(const char *inName, ColorArg inColor, const JobFunction &inJobFunction, uint32 inNumDependencies = 0) = 0;157158/// Create a new barrier, used to wait on jobs159virtual Barrier * CreateBarrier() = 0;160161/// Destroy a barrier when it is no longer used. The barrier should be empty at this point.162virtual void DestroyBarrier(Barrier *inBarrier) = 0;163164/// Wait for a set of jobs to be finished, note that only 1 thread can be waiting on a barrier at a time165virtual void WaitForJobs(Barrier *inBarrier) = 0;166167protected:168/// A class that contains information for a single unit of work169class Job170{171public:172JPH_OVERRIDE_NEW_DELETE173174/// Constructor175Job([[maybe_unused]] const char *inJobName, [[maybe_unused]] ColorArg inColor, JobSystem *inJobSystem, const JobFunction &inJobFunction, uint32 inNumDependencies) :176#if defined(JPH_EXTERNAL_PROFILE) || defined(JPH_PROFILE_ENABLED)177mJobName(inJobName),178mColor(inColor),179#endif // defined(JPH_EXTERNAL_PROFILE) || defined(JPH_PROFILE_ENABLED)180mJobSystem(inJobSystem),181mJobFunction(inJobFunction),182mNumDependencies(inNumDependencies)183{184}185186/// Get the jobs system to which this job belongs187inline JobSystem * GetJobSystem() { return mJobSystem; }188189/// Add or release a reference to this object190inline void AddRef()191{192// Adding a reference can use relaxed memory ordering193mReferenceCount.fetch_add(1, memory_order_relaxed);194}195inline void Release()196{197#ifndef JPH_TSAN_ENABLED198// Releasing a reference must use release semantics...199if (mReferenceCount.fetch_sub(1, memory_order_release) == 1)200{201// ... so that we can use acquire to ensure that we see any updates from other threads that released a ref before freeing the job202atomic_thread_fence(memory_order_acquire);203mJobSystem->FreeJob(this);204}205#else206// But under TSAN, we cannot use atomic_thread_fence, so we use an acq_rel operation unconditionally instead207if (mReferenceCount.fetch_sub(1, memory_order_acq_rel) == 1)208mJobSystem->FreeJob(this);209#endif210}211212/// Add to the dependency counter.213inline void AddDependency(int inCount);214215/// Remove from the dependency counter. Returns true whenever the dependency counter reaches zero216/// and if it does it is no longer valid to call the AddDependency/RemoveDependency functions.217inline bool RemoveDependency(int inCount);218219/// Remove from the dependency counter. Job will be queued whenever the dependency counter reaches zero220/// and if it does it is no longer valid to call the AddDependency/RemoveDependency functions.221inline void RemoveDependencyAndQueue(int inCount);222223/// Set the job barrier that this job belongs to and returns false if this was not possible because the job already finished224inline bool SetBarrier(Barrier *inBarrier)225{226intptr_t barrier = 0;227if (mBarrier.compare_exchange_strong(barrier, reinterpret_cast<intptr_t>(inBarrier), memory_order_relaxed))228return true;229JPH_ASSERT(barrier == cBarrierDoneState, "A job can only belong to 1 barrier");230return false;231}232233/// Run the job function, returns the number of dependencies that this job still has or cExecutingState or cDoneState234inline uint32 Execute()235{236// Transition job to executing state237uint32 state = 0; // We can only start running with a dependency counter of 0238if (!mNumDependencies.compare_exchange_strong(state, cExecutingState, memory_order_acquire))239return state; // state is updated by compare_exchange_strong to the current value240241// Run the job function242{243JPH_PROFILE(mJobName, mColor.GetUInt32());244mJobFunction();245}246247// Fetch the barrier pointer and exchange it for the done state, so we're sure that no barrier gets set after we want to call the callback248intptr_t barrier = mBarrier.load(memory_order_relaxed);249for (;;)250{251if (mBarrier.compare_exchange_weak(barrier, cBarrierDoneState, memory_order_relaxed))252break;253}254JPH_ASSERT(barrier != cBarrierDoneState);255256// Mark job as done257state = cExecutingState;258mNumDependencies.compare_exchange_strong(state, cDoneState, memory_order_relaxed);259JPH_ASSERT(state == cExecutingState);260261// Notify the barrier after we've changed the job to the done state so that any thread reading the state after receiving the callback will see that the job has finished262if (barrier != 0)263reinterpret_cast<Barrier *>(barrier)->OnJobFinished(this);264265return cDoneState;266}267268/// Test if the job can be executed269inline bool CanBeExecuted() const { return mNumDependencies.load(memory_order_relaxed) == 0; }270271/// Test if the job finished executing272inline bool IsDone() const { return mNumDependencies.load(memory_order_relaxed) == cDoneState; }273274#if defined(JPH_EXTERNAL_PROFILE) || defined(JPH_PROFILE_ENABLED)275/// Get the name of the job276const char * GetName() const { return mJobName; }277#endif // defined(JPH_EXTERNAL_PROFILE) || defined(JPH_PROFILE_ENABLED)278279static constexpr uint32 cExecutingState = 0xe0e0e0e0; ///< Value of mNumDependencies when job is executing280static constexpr uint32 cDoneState = 0xd0d0d0d0; ///< Value of mNumDependencies when job is done executing281282static constexpr intptr_t cBarrierDoneState = ~intptr_t(0); ///< Value to use when the barrier has been triggered283284private:285#if defined(JPH_EXTERNAL_PROFILE) || defined(JPH_PROFILE_ENABLED)286const char * mJobName; ///< Name of the job287Color mColor; ///< Color of the job in the profiler288#endif // defined(JPH_EXTERNAL_PROFILE) || defined(JPH_PROFILE_ENABLED)289JobSystem * mJobSystem; ///< The job system we belong to290atomic<intptr_t> mBarrier = 0; ///< Barrier that this job is associated with (is a Barrier pointer)291JobFunction mJobFunction; ///< Main job function292atomic<uint32> mReferenceCount = 0; ///< Amount of JobHandles pointing to this job293atomic<uint32> mNumDependencies; ///< Amount of jobs that need to complete before this job can run294};295296/// Adds a job to the job queue297virtual void QueueJob(Job *inJob) = 0;298299/// Adds a number of jobs at once to the job queue300virtual void QueueJobs(Job **inJobs, uint inNumJobs) = 0;301302/// Frees a job303virtual void FreeJob(Job *inJob) = 0;304};305306using JobHandle = JobSystem::JobHandle;307308JPH_NAMESPACE_END309310#include "JobSystem.inl"311312313