Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
godotengine
GitHub Repository: godotengine/godot
Path: blob/master/core/templates/command_queue_mt.h
21165 views
1
/**************************************************************************/
2
/* command_queue_mt.h */
3
/**************************************************************************/
4
/* This file is part of: */
5
/* GODOT ENGINE */
6
/* https://godotengine.org */
7
/**************************************************************************/
8
/* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */
9
/* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */
10
/* */
11
/* Permission is hereby granted, free of charge, to any person obtaining */
12
/* a copy of this software and associated documentation files (the */
13
/* "Software"), to deal in the Software without restriction, including */
14
/* without limitation the rights to use, copy, modify, merge, publish, */
15
/* distribute, sublicense, and/or sell copies of the Software, and to */
16
/* permit persons to whom the Software is furnished to do so, subject to */
17
/* the following conditions: */
18
/* */
19
/* The above copyright notice and this permission notice shall be */
20
/* included in all copies or substantial portions of the Software. */
21
/* */
22
/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
23
/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
24
/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */
25
/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
26
/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
27
/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
28
/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
29
/**************************************************************************/
30
31
#pragma once
32
33
#include "core/object/worker_thread_pool.h"
34
#include "core/os/condition_variable.h"
35
#include "core/os/mutex.h"
36
#include "core/templates/local_vector.h"
37
#include "core/templates/simple_type.h"
38
#include "core/templates/tuple.h"
39
#include "core/typedefs.h"
40
41
class CommandQueueMT {
42
static const size_t MAX_COMMAND_SIZE = 1024;
43
44
struct CommandBase {
45
bool sync = false;
46
virtual void call() = 0;
47
virtual ~CommandBase() = default;
48
49
CommandBase(bool p_sync) :
50
sync(p_sync) {}
51
};
52
53
template <typename T, typename M, bool NeedsSync, typename... Args>
54
struct Command : public CommandBase {
55
T *instance;
56
M method;
57
Tuple<GetSimpleTypeT<Args>...> args;
58
59
template <typename... FwdArgs>
60
_FORCE_INLINE_ Command(T *p_instance, M p_method, FwdArgs &&...p_args) :
61
CommandBase(NeedsSync), instance(p_instance), method(p_method), args(std::forward<FwdArgs>(p_args)...) {}
62
63
void call() override {
64
call_impl(BuildIndexSequence<sizeof...(Args)>{});
65
}
66
67
private:
68
template <size_t... I>
69
_FORCE_INLINE_ void call_impl(IndexSequence<I...>) {
70
// Move out of the Tuple, this will be destroyed as soon as the call is complete.
71
(instance->*method)(std::move(get<I>())...);
72
}
73
74
// This method exists so we can call it in the parameter pack expansion in call_impl.
75
template <size_t I>
76
_FORCE_INLINE_ auto &get() { return ::tuple_get<I>(args); }
77
};
78
79
// Separate class from Command so we can save the space of the ret pointer for commands that don't return.
80
template <typename T, typename M, typename R, typename... Args>
81
struct CommandRet : public CommandBase {
82
T *instance;
83
M method;
84
R *ret;
85
Tuple<GetSimpleTypeT<Args>...> args;
86
87
_FORCE_INLINE_ CommandRet(T *p_instance, M p_method, R *p_ret, GetSimpleTypeT<Args>... p_args) :
88
CommandBase(true), instance(p_instance), method(p_method), ret(p_ret), args{ p_args... } {}
89
90
void call() override {
91
*ret = call_impl(BuildIndexSequence<sizeof...(Args)>{});
92
}
93
94
private:
95
template <size_t... I>
96
_FORCE_INLINE_ R call_impl(IndexSequence<I...>) {
97
// Move out of the Tuple, this will be destroyed as soon as the call is complete.
98
return (instance->*method)(std::move(get<I>())...);
99
}
100
101
// This method exists so we can call it in the parameter pack expansion in call_impl.
102
template <size_t I>
103
_FORCE_INLINE_ auto &get() { return ::tuple_get<I>(args); }
104
};
105
106
/***** BASE *******/
107
108
static const uint32_t DEFAULT_COMMAND_MEM_SIZE_KB = 64;
109
110
inline static thread_local bool flushing = false;
111
112
BinaryMutex mutex;
113
LocalVector<uint8_t> command_mem;
114
ConditionVariable sync_cond_var;
115
uint32_t sync_head = 0;
116
uint32_t sync_tail = 0;
117
uint32_t sync_awaiters = 0;
118
WorkerThreadPool::TaskID pump_task_id = WorkerThreadPool::INVALID_TASK_ID;
119
uint64_t flush_read_ptr = 0;
120
std::atomic<bool> pending{ false };
121
122
template <typename T, typename... Args>
123
_FORCE_INLINE_ void create_command(Args &&...p_args) {
124
// alloc size is size+T+safeguard
125
constexpr uint64_t alloc_size = ((sizeof(T) + 8U - 1U) & ~(8U - 1U));
126
static_assert(alloc_size < UINT32_MAX, "Type too large to fit in the command queue.");
127
128
uint64_t size = command_mem.size();
129
command_mem.resize(size + alloc_size + sizeof(uint64_t));
130
*(uint64_t *)&command_mem[size] = alloc_size;
131
void *cmd = &command_mem[size + sizeof(uint64_t)];
132
memnew_placement(cmd, T(std::forward<Args>(p_args)...));
133
pending.store(true);
134
}
135
136
template <typename T, bool NeedsSync, typename... Args>
137
_FORCE_INLINE_ void _push_internal(Args &&...args) {
138
MutexLock mlock(mutex);
139
create_command<T>(std::forward<Args>(args)...);
140
141
if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) {
142
WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id);
143
}
144
145
if constexpr (NeedsSync) {
146
sync_tail++;
147
_wait_for_sync(mlock);
148
}
149
}
150
151
_FORCE_INLINE_ void _prevent_sync_wraparound() {
152
bool safe_to_reset = !sync_awaiters;
153
bool already_sync_to_latest = sync_head == sync_tail;
154
if (safe_to_reset && already_sync_to_latest) {
155
sync_head = 0;
156
sync_tail = 0;
157
}
158
}
159
160
void _flush() {
161
// Safeguard against trying to re-lock the binary mutex.
162
if (flushing) {
163
return;
164
}
165
166
flushing = true;
167
168
MutexLock lock(mutex);
169
170
if (unlikely(flush_read_ptr)) {
171
// Another thread is flushing.
172
lock.temp_unlock(); // Not really temp.
173
sync();
174
flushing = false;
175
return;
176
}
177
178
alignas(uint64_t) char cmd_local_mem[MAX_COMMAND_SIZE];
179
180
while (flush_read_ptr < command_mem.size()) {
181
uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr];
182
flush_read_ptr += sizeof(uint64_t);
183
184
// Protect against race condition between this thread
185
// during the call to the command and other threads potentially
186
// invalidating the pointer due to reallocs by relocating the object.
187
CommandBase *cmd_original = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
188
CommandBase *cmd_local = reinterpret_cast<CommandBase *>(cmd_local_mem);
189
memcpy(cmd_local_mem, (char *)cmd_original, size);
190
191
lock.temp_unlock();
192
cmd_local->call();
193
lock.temp_relock();
194
195
if (unlikely(cmd_local->sync)) {
196
sync_head++;
197
lock.temp_unlock(); // Give an opportunity to awaiters right away.
198
sync_cond_var.notify_all();
199
lock.temp_relock();
200
}
201
202
cmd_local->~CommandBase();
203
204
flush_read_ptr += size;
205
}
206
207
command_mem.clear();
208
pending.store(false);
209
flush_read_ptr = 0;
210
211
_prevent_sync_wraparound();
212
213
flushing = false;
214
}
215
216
_FORCE_INLINE_ void _wait_for_sync(MutexLock<BinaryMutex> &p_lock) {
217
sync_awaiters++;
218
uint32_t sync_head_goal = sync_tail;
219
do {
220
sync_cond_var.wait(p_lock);
221
} while (sync_head < sync_head_goal);
222
sync_awaiters--;
223
_prevent_sync_wraparound();
224
}
225
226
void _no_op() {}
227
228
public:
229
template <typename T, typename M, typename... Args>
230
void push(T *p_instance, M p_method, Args &&...p_args) {
231
// Standard command, no sync.
232
using CommandType = Command<T, M, false, Args...>;
233
static_assert(sizeof(CommandType) <= MAX_COMMAND_SIZE);
234
_push_internal<CommandType, false>(p_instance, p_method, std::forward<Args>(p_args)...);
235
}
236
237
template <typename T, typename M, typename... Args>
238
void push_and_sync(T *p_instance, M p_method, Args... p_args) {
239
// Standard command, sync.
240
using CommandType = Command<T, M, true, Args...>;
241
static_assert(sizeof(CommandType) <= MAX_COMMAND_SIZE);
242
_push_internal<CommandType, true>(p_instance, p_method, std::forward<Args>(p_args)...);
243
}
244
245
template <typename T, typename M, typename R, typename... Args>
246
void push_and_ret(T *p_instance, M p_method, R *r_ret, Args... p_args) {
247
// Command with return value, sync.
248
using CommandType = CommandRet<T, M, R, Args...>;
249
static_assert(sizeof(CommandType) <= MAX_COMMAND_SIZE);
250
_push_internal<CommandType, true>(p_instance, p_method, r_ret, std::forward<Args>(p_args)...);
251
}
252
253
_FORCE_INLINE_ void flush_if_pending() {
254
if (unlikely(pending.load())) {
255
_flush();
256
}
257
}
258
259
void flush_all() {
260
_flush();
261
}
262
263
void sync() {
264
push_and_sync(this, &CommandQueueMT::_no_op);
265
}
266
267
void wait_and_flush() {
268
ERR_FAIL_COND(pump_task_id == WorkerThreadPool::INVALID_TASK_ID);
269
WorkerThreadPool::get_singleton()->wait_for_task_completion(pump_task_id);
270
_flush();
271
}
272
273
void set_pump_task_id(WorkerThreadPool::TaskID p_task_id) {
274
MutexLock lock(mutex);
275
pump_task_id = p_task_id;
276
}
277
278
CommandQueueMT() {
279
command_mem.reserve(DEFAULT_COMMAND_MEM_SIZE_KB * 1024);
280
}
281
};
282
283