Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
godotengine
GitHub Repository: godotengine/godot
Path: blob/master/thirdparty/jolt_physics/Jolt/Core/JobSystemWithBarrier.cpp
9906 views
1
// Jolt Physics Library (https://github.com/jrouwe/JoltPhysics)
2
// SPDX-FileCopyrightText: 2023 Jorrit Rouwe
3
// SPDX-License-Identifier: MIT
4
5
#include <Jolt/Jolt.h>
6
7
#include <Jolt/Core/JobSystemWithBarrier.h>
8
#include <Jolt/Core/Profiler.h>
9
10
JPH_SUPPRESS_WARNINGS_STD_BEGIN
11
#include <thread>
12
JPH_SUPPRESS_WARNINGS_STD_END
13
14
JPH_NAMESPACE_BEGIN
15
16
JobSystemWithBarrier::BarrierImpl::BarrierImpl()
17
{
18
for (atomic<Job *> &j : mJobs)
19
j = nullptr;
20
}
21
22
JobSystemWithBarrier::BarrierImpl::~BarrierImpl()
23
{
24
JPH_ASSERT(IsEmpty());
25
}
26
27
void JobSystemWithBarrier::BarrierImpl::AddJob(const JobHandle &inJob)
28
{
29
JPH_PROFILE_FUNCTION();
30
31
bool release_semaphore = false;
32
33
// Set the barrier on the job, this returns true if the barrier was successfully set (otherwise the job is already done and we don't need to add it to our list)
34
Job *job = inJob.GetPtr();
35
if (job->SetBarrier(this))
36
{
37
// If the job can be executed we want to release the semaphore an extra time to allow the waiting thread to start executing it
38
mNumToAcquire++;
39
if (job->CanBeExecuted())
40
{
41
release_semaphore = true;
42
mNumToAcquire++;
43
}
44
45
// Add the job to our job list
46
job->AddRef();
47
uint write_index = mJobWriteIndex++;
48
while (write_index - mJobReadIndex >= cMaxJobs)
49
{
50
JPH_ASSERT(false, "Barrier full, stalling!");
51
std::this_thread::sleep_for(std::chrono::microseconds(100));
52
}
53
mJobs[write_index & (cMaxJobs - 1)] = job;
54
}
55
56
// Notify waiting thread that a new executable job is available
57
if (release_semaphore)
58
mSemaphore.Release();
59
}
60
61
void JobSystemWithBarrier::BarrierImpl::AddJobs(const JobHandle *inHandles, uint inNumHandles)
62
{
63
JPH_PROFILE_FUNCTION();
64
65
bool release_semaphore = false;
66
67
for (const JobHandle *handle = inHandles, *handles_end = inHandles + inNumHandles; handle < handles_end; ++handle)
68
{
69
// Set the barrier on the job, this returns true if the barrier was successfully set (otherwise the job is already done and we don't need to add it to our list)
70
Job *job = handle->GetPtr();
71
if (job->SetBarrier(this))
72
{
73
// If the job can be executed we want to release the semaphore an extra time to allow the waiting thread to start executing it
74
mNumToAcquire++;
75
if (!release_semaphore && job->CanBeExecuted())
76
{
77
release_semaphore = true;
78
mNumToAcquire++;
79
}
80
81
// Add the job to our job list
82
job->AddRef();
83
uint write_index = mJobWriteIndex++;
84
while (write_index - mJobReadIndex >= cMaxJobs)
85
{
86
JPH_ASSERT(false, "Barrier full, stalling!");
87
std::this_thread::sleep_for(std::chrono::microseconds(100));
88
}
89
mJobs[write_index & (cMaxJobs - 1)] = job;
90
}
91
}
92
93
// Notify waiting thread that a new executable job is available
94
if (release_semaphore)
95
mSemaphore.Release();
96
}
97
98
void JobSystemWithBarrier::BarrierImpl::OnJobFinished(Job *inJob)
99
{
100
JPH_PROFILE_FUNCTION();
101
102
mSemaphore.Release();
103
}
104
105
void JobSystemWithBarrier::BarrierImpl::Wait()
106
{
107
while (mNumToAcquire > 0)
108
{
109
{
110
JPH_PROFILE("Execute Jobs");
111
112
// Go through all jobs
113
bool has_executed;
114
do
115
{
116
has_executed = false;
117
118
// Loop through the jobs and erase jobs from the beginning of the list that are done
119
while (mJobReadIndex < mJobWriteIndex)
120
{
121
atomic<Job *> &job = mJobs[mJobReadIndex & (cMaxJobs - 1)];
122
Job *job_ptr = job.load();
123
if (job_ptr == nullptr || !job_ptr->IsDone())
124
break;
125
126
// Job is finished, release it
127
job_ptr->Release();
128
job = nullptr;
129
++mJobReadIndex;
130
}
131
132
// Loop through the jobs and execute the first executable job
133
for (uint index = mJobReadIndex; index < mJobWriteIndex; ++index)
134
{
135
const atomic<Job *> &job = mJobs[index & (cMaxJobs - 1)];
136
Job *job_ptr = job.load();
137
if (job_ptr != nullptr && job_ptr->CanBeExecuted())
138
{
139
// This will only execute the job if it has not already executed
140
job_ptr->Execute();
141
has_executed = true;
142
break;
143
}
144
}
145
146
} while (has_executed);
147
}
148
149
// Wait for another thread to wake us when either there is more work to do or when all jobs have completed.
150
// When there have been multiple releases, we acquire them all at the same time to avoid needlessly spinning on executing jobs.
151
// Note that using GetValue is inherently unsafe since we can read a stale value, but this is not an issue here as this is the only
152
// place where we acquire the semaphore. Other threads only release it, so we can only read a value that is lower or equal to the actual value.
153
int num_to_acquire = max(1, mSemaphore.GetValue());
154
mSemaphore.Acquire(num_to_acquire);
155
mNumToAcquire -= num_to_acquire;
156
}
157
158
// All jobs should be done now, release them
159
while (mJobReadIndex < mJobWriteIndex)
160
{
161
atomic<Job *> &job = mJobs[mJobReadIndex & (cMaxJobs - 1)];
162
Job *job_ptr = job.load();
163
JPH_ASSERT(job_ptr != nullptr && job_ptr->IsDone());
164
job_ptr->Release();
165
job = nullptr;
166
++mJobReadIndex;
167
}
168
}
169
170
void JobSystemWithBarrier::Init(uint inMaxBarriers)
171
{
172
JPH_ASSERT(mBarriers == nullptr); // Already initialized?
173
174
// Init freelist of barriers
175
mMaxBarriers = inMaxBarriers;
176
mBarriers = new BarrierImpl [inMaxBarriers];
177
}
178
179
JobSystemWithBarrier::JobSystemWithBarrier(uint inMaxBarriers)
180
{
181
Init(inMaxBarriers);
182
}
183
184
JobSystemWithBarrier::~JobSystemWithBarrier()
185
{
186
// Ensure that none of the barriers are used
187
#ifdef JPH_ENABLE_ASSERTS
188
for (const BarrierImpl *b = mBarriers, *b_end = mBarriers + mMaxBarriers; b < b_end; ++b)
189
JPH_ASSERT(!b->mInUse);
190
#endif // JPH_ENABLE_ASSERTS
191
delete [] mBarriers;
192
}
193
194
JobSystem::Barrier *JobSystemWithBarrier::CreateBarrier()
195
{
196
JPH_PROFILE_FUNCTION();
197
198
// Find the first unused barrier
199
for (uint32 index = 0; index < mMaxBarriers; ++index)
200
{
201
bool expected = false;
202
if (mBarriers[index].mInUse.compare_exchange_strong(expected, true))
203
return &mBarriers[index];
204
}
205
206
return nullptr;
207
}
208
209
void JobSystemWithBarrier::DestroyBarrier(Barrier *inBarrier)
210
{
211
JPH_PROFILE_FUNCTION();
212
213
// Check that no jobs are in the barrier
214
JPH_ASSERT(static_cast<BarrierImpl *>(inBarrier)->IsEmpty());
215
216
// Flag the barrier as unused
217
bool expected = true;
218
static_cast<BarrierImpl *>(inBarrier)->mInUse.compare_exchange_strong(expected, false);
219
JPH_ASSERT(expected);
220
}
221
222
void JobSystemWithBarrier::WaitForJobs(Barrier *inBarrier)
223
{
224
JPH_PROFILE_FUNCTION();
225
226
// Let our barrier implementation wait for the jobs
227
static_cast<BarrierImpl *>(inBarrier)->Wait();
228
}
229
230
JPH_NAMESPACE_END
231
232