Path: blob/master/thirdparty/jolt_physics/Jolt/Core/JobSystemWithBarrier.cpp
9906 views
// Jolt Physics Library (https://github.com/jrouwe/JoltPhysics)1// SPDX-FileCopyrightText: 2023 Jorrit Rouwe2// SPDX-License-Identifier: MIT34#include <Jolt/Jolt.h>56#include <Jolt/Core/JobSystemWithBarrier.h>7#include <Jolt/Core/Profiler.h>89JPH_SUPPRESS_WARNINGS_STD_BEGIN10#include <thread>11JPH_SUPPRESS_WARNINGS_STD_END1213JPH_NAMESPACE_BEGIN1415JobSystemWithBarrier::BarrierImpl::BarrierImpl()16{17for (atomic<Job *> &j : mJobs)18j = nullptr;19}2021JobSystemWithBarrier::BarrierImpl::~BarrierImpl()22{23JPH_ASSERT(IsEmpty());24}2526void JobSystemWithBarrier::BarrierImpl::AddJob(const JobHandle &inJob)27{28JPH_PROFILE_FUNCTION();2930bool release_semaphore = false;3132// Set the barrier on the job, this returns true if the barrier was successfully set (otherwise the job is already done and we don't need to add it to our list)33Job *job = inJob.GetPtr();34if (job->SetBarrier(this))35{36// If the job can be executed we want to release the semaphore an extra time to allow the waiting thread to start executing it37mNumToAcquire++;38if (job->CanBeExecuted())39{40release_semaphore = true;41mNumToAcquire++;42}4344// Add the job to our job list45job->AddRef();46uint write_index = mJobWriteIndex++;47while (write_index - mJobReadIndex >= cMaxJobs)48{49JPH_ASSERT(false, "Barrier full, stalling!");50std::this_thread::sleep_for(std::chrono::microseconds(100));51}52mJobs[write_index & (cMaxJobs - 1)] = job;53}5455// Notify waiting thread that a new executable job is available56if (release_semaphore)57mSemaphore.Release();58}5960void JobSystemWithBarrier::BarrierImpl::AddJobs(const JobHandle *inHandles, uint inNumHandles)61{62JPH_PROFILE_FUNCTION();6364bool release_semaphore = false;6566for (const JobHandle *handle = inHandles, *handles_end = inHandles + inNumHandles; handle < handles_end; ++handle)67{68// Set the barrier on the job, this returns true if the barrier was successfully set (otherwise the job is already done and we don't need to add it to our list)69Job *job = handle->GetPtr();70if (job->SetBarrier(this))71{72// If the job can be executed we want to release the semaphore an extra time to allow the waiting thread to start executing it73mNumToAcquire++;74if (!release_semaphore && job->CanBeExecuted())75{76release_semaphore = true;77mNumToAcquire++;78}7980// Add the job to our job list81job->AddRef();82uint write_index = mJobWriteIndex++;83while (write_index - mJobReadIndex >= cMaxJobs)84{85JPH_ASSERT(false, "Barrier full, stalling!");86std::this_thread::sleep_for(std::chrono::microseconds(100));87}88mJobs[write_index & (cMaxJobs - 1)] = job;89}90}9192// Notify waiting thread that a new executable job is available93if (release_semaphore)94mSemaphore.Release();95}9697void JobSystemWithBarrier::BarrierImpl::OnJobFinished(Job *inJob)98{99JPH_PROFILE_FUNCTION();100101mSemaphore.Release();102}103104void JobSystemWithBarrier::BarrierImpl::Wait()105{106while (mNumToAcquire > 0)107{108{109JPH_PROFILE("Execute Jobs");110111// Go through all jobs112bool has_executed;113do114{115has_executed = false;116117// Loop through the jobs and erase jobs from the beginning of the list that are done118while (mJobReadIndex < mJobWriteIndex)119{120atomic<Job *> &job = mJobs[mJobReadIndex & (cMaxJobs - 1)];121Job *job_ptr = job.load();122if (job_ptr == nullptr || !job_ptr->IsDone())123break;124125// Job is finished, release it126job_ptr->Release();127job = nullptr;128++mJobReadIndex;129}130131// Loop through the jobs and execute the first executable job132for (uint index = mJobReadIndex; index < mJobWriteIndex; ++index)133{134const atomic<Job *> &job = mJobs[index & (cMaxJobs - 1)];135Job *job_ptr = job.load();136if (job_ptr != nullptr && job_ptr->CanBeExecuted())137{138// This will only execute the job if it has not already executed139job_ptr->Execute();140has_executed = true;141break;142}143}144145} while (has_executed);146}147148// Wait for another thread to wake us when either there is more work to do or when all jobs have completed.149// When there have been multiple releases, we acquire them all at the same time to avoid needlessly spinning on executing jobs.150// Note that using GetValue is inherently unsafe since we can read a stale value, but this is not an issue here as this is the only151// place where we acquire the semaphore. Other threads only release it, so we can only read a value that is lower or equal to the actual value.152int num_to_acquire = max(1, mSemaphore.GetValue());153mSemaphore.Acquire(num_to_acquire);154mNumToAcquire -= num_to_acquire;155}156157// All jobs should be done now, release them158while (mJobReadIndex < mJobWriteIndex)159{160atomic<Job *> &job = mJobs[mJobReadIndex & (cMaxJobs - 1)];161Job *job_ptr = job.load();162JPH_ASSERT(job_ptr != nullptr && job_ptr->IsDone());163job_ptr->Release();164job = nullptr;165++mJobReadIndex;166}167}168169void JobSystemWithBarrier::Init(uint inMaxBarriers)170{171JPH_ASSERT(mBarriers == nullptr); // Already initialized?172173// Init freelist of barriers174mMaxBarriers = inMaxBarriers;175mBarriers = new BarrierImpl [inMaxBarriers];176}177178JobSystemWithBarrier::JobSystemWithBarrier(uint inMaxBarriers)179{180Init(inMaxBarriers);181}182183JobSystemWithBarrier::~JobSystemWithBarrier()184{185// Ensure that none of the barriers are used186#ifdef JPH_ENABLE_ASSERTS187for (const BarrierImpl *b = mBarriers, *b_end = mBarriers + mMaxBarriers; b < b_end; ++b)188JPH_ASSERT(!b->mInUse);189#endif // JPH_ENABLE_ASSERTS190delete [] mBarriers;191}192193JobSystem::Barrier *JobSystemWithBarrier::CreateBarrier()194{195JPH_PROFILE_FUNCTION();196197// Find the first unused barrier198for (uint32 index = 0; index < mMaxBarriers; ++index)199{200bool expected = false;201if (mBarriers[index].mInUse.compare_exchange_strong(expected, true))202return &mBarriers[index];203}204205return nullptr;206}207208void JobSystemWithBarrier::DestroyBarrier(Barrier *inBarrier)209{210JPH_PROFILE_FUNCTION();211212// Check that no jobs are in the barrier213JPH_ASSERT(static_cast<BarrierImpl *>(inBarrier)->IsEmpty());214215// Flag the barrier as unused216bool expected = true;217static_cast<BarrierImpl *>(inBarrier)->mInUse.compare_exchange_strong(expected, false);218JPH_ASSERT(expected);219}220221void JobSystemWithBarrier::WaitForJobs(Barrier *inBarrier)222{223JPH_PROFILE_FUNCTION();224225// Let our barrier implementation wait for the jobs226static_cast<BarrierImpl *>(inBarrier)->Wait();227}228229JPH_NAMESPACE_END230231232