Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
godotengine
GitHub Repository: godotengine/godot
Path: blob/master/core/templates/command_queue_mt.h
9973 views
1
/**************************************************************************/
2
/* command_queue_mt.h */
3
/**************************************************************************/
4
/* This file is part of: */
5
/* GODOT ENGINE */
6
/* https://godotengine.org */
7
/**************************************************************************/
8
/* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */
9
/* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */
10
/* */
11
/* Permission is hereby granted, free of charge, to any person obtaining */
12
/* a copy of this software and associated documentation files (the */
13
/* "Software"), to deal in the Software without restriction, including */
14
/* without limitation the rights to use, copy, modify, merge, publish, */
15
/* distribute, sublicense, and/or sell copies of the Software, and to */
16
/* permit persons to whom the Software is furnished to do so, subject to */
17
/* the following conditions: */
18
/* */
19
/* The above copyright notice and this permission notice shall be */
20
/* included in all copies or substantial portions of the Software. */
21
/* */
22
/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
23
/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
24
/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */
25
/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
26
/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
27
/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
28
/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
29
/**************************************************************************/
30
31
#pragma once
32
33
#include "core/object/worker_thread_pool.h"
34
#include "core/os/condition_variable.h"
35
#include "core/os/mutex.h"
36
#include "core/templates/local_vector.h"
37
#include "core/templates/simple_type.h"
38
#include "core/templates/tuple.h"
39
#include "core/typedefs.h"
40
41
class CommandQueueMT {
42
struct CommandBase {
43
bool sync = false;
44
virtual void call() = 0;
45
virtual ~CommandBase() = default;
46
47
CommandBase(bool p_sync) :
48
sync(p_sync) {}
49
};
50
51
template <typename T, typename M, bool NeedsSync, typename... Args>
52
struct Command : public CommandBase {
53
T *instance;
54
M method;
55
Tuple<GetSimpleTypeT<Args>...> args;
56
57
template <typename... FwdArgs>
58
_FORCE_INLINE_ Command(T *p_instance, M p_method, FwdArgs &&...p_args) :
59
CommandBase(NeedsSync), instance(p_instance), method(p_method), args(std::forward<FwdArgs>(p_args)...) {}
60
61
void call() {
62
call_impl(BuildIndexSequence<sizeof...(Args)>{});
63
}
64
65
private:
66
template <size_t... I>
67
_FORCE_INLINE_ void call_impl(IndexSequence<I...>) {
68
// Move out of the Tuple, this will be destroyed as soon as the call is complete.
69
(instance->*method)(std::move(get<I>())...);
70
}
71
72
// This method exists so we can call it in the parameter pack expansion in call_impl.
73
template <size_t I>
74
_FORCE_INLINE_ auto &get() { return ::tuple_get<I>(args); }
75
};
76
77
// Separate class from Command so we can save the space of the ret pointer for commands that don't return.
78
template <typename T, typename M, typename R, typename... Args>
79
struct CommandRet : public CommandBase {
80
T *instance;
81
M method;
82
R *ret;
83
Tuple<GetSimpleTypeT<Args>...> args;
84
85
_FORCE_INLINE_ CommandRet(T *p_instance, M p_method, R *p_ret, GetSimpleTypeT<Args>... p_args) :
86
CommandBase(true), instance(p_instance), method(p_method), ret(p_ret), args{ p_args... } {}
87
88
void call() override {
89
*ret = call_impl(BuildIndexSequence<sizeof...(Args)>{});
90
}
91
92
private:
93
template <size_t... I>
94
_FORCE_INLINE_ R call_impl(IndexSequence<I...>) {
95
// Move out of the Tuple, this will be destroyed as soon as the call is complete.
96
return (instance->*method)(std::move(get<I>())...);
97
}
98
99
// This method exists so we can call it in the parameter pack expansion in call_impl.
100
template <size_t I>
101
_FORCE_INLINE_ auto &get() { return ::tuple_get<I>(args); }
102
};
103
104
/***** BASE *******/
105
106
static const uint32_t DEFAULT_COMMAND_MEM_SIZE_KB = 64;
107
108
BinaryMutex mutex;
109
LocalVector<uint8_t> command_mem;
110
ConditionVariable sync_cond_var;
111
uint32_t sync_head = 0;
112
uint32_t sync_tail = 0;
113
uint32_t sync_awaiters = 0;
114
WorkerThreadPool::TaskID pump_task_id = WorkerThreadPool::INVALID_TASK_ID;
115
uint64_t flush_read_ptr = 0;
116
std::atomic<bool> pending{ false };
117
118
template <typename T, typename... Args>
119
_FORCE_INLINE_ void create_command(Args &&...p_args) {
120
// alloc size is size+T+safeguard
121
constexpr uint64_t alloc_size = ((sizeof(T) + 8U - 1U) & ~(8U - 1U));
122
static_assert(alloc_size < UINT32_MAX, "Type too large to fit in the command queue.");
123
124
uint64_t size = command_mem.size();
125
command_mem.resize(size + alloc_size + sizeof(uint64_t));
126
*(uint64_t *)&command_mem[size] = alloc_size;
127
void *cmd = &command_mem[size + sizeof(uint64_t)];
128
new (cmd) T(std::forward<Args>(p_args)...);
129
pending.store(true);
130
}
131
132
template <typename T, bool NeedsSync, typename... Args>
133
_FORCE_INLINE_ void _push_internal(Args &&...args) {
134
MutexLock mlock(mutex);
135
create_command<T>(std::forward<Args>(args)...);
136
137
if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) {
138
WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id);
139
}
140
141
if constexpr (NeedsSync) {
142
sync_tail++;
143
_wait_for_sync(mlock);
144
}
145
}
146
147
_FORCE_INLINE_ void _prevent_sync_wraparound() {
148
bool safe_to_reset = !sync_awaiters;
149
bool already_sync_to_latest = sync_head == sync_tail;
150
if (safe_to_reset && already_sync_to_latest) {
151
sync_head = 0;
152
sync_tail = 0;
153
}
154
}
155
156
void _flush() {
157
if (unlikely(flush_read_ptr)) {
158
// Re-entrant call.
159
return;
160
}
161
162
MutexLock lock(mutex);
163
164
while (flush_read_ptr < command_mem.size()) {
165
uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr];
166
flush_read_ptr += 8;
167
CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
168
uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(lock);
169
cmd->call();
170
WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id);
171
172
// Handle potential realloc due to the command and unlock allowance.
173
cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
174
175
if (unlikely(cmd->sync)) {
176
sync_head++;
177
lock.~MutexLock(); // Give an opportunity to awaiters right away.
178
sync_cond_var.notify_all();
179
new (&lock) MutexLock(mutex);
180
// Handle potential realloc happened during unlock.
181
cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
182
}
183
184
cmd->~CommandBase();
185
186
flush_read_ptr += size;
187
}
188
189
command_mem.clear();
190
pending.store(false);
191
flush_read_ptr = 0;
192
193
_prevent_sync_wraparound();
194
}
195
196
_FORCE_INLINE_ void _wait_for_sync(MutexLock<BinaryMutex> &p_lock) {
197
sync_awaiters++;
198
uint32_t sync_head_goal = sync_tail;
199
do {
200
sync_cond_var.wait(p_lock);
201
} while (sync_head < sync_head_goal);
202
sync_awaiters--;
203
_prevent_sync_wraparound();
204
}
205
206
void _no_op() {}
207
208
public:
209
template <typename T, typename M, typename... Args>
210
void push(T *p_instance, M p_method, Args &&...p_args) {
211
// Standard command, no sync.
212
using CommandType = Command<T, M, false, Args...>;
213
_push_internal<CommandType, false>(p_instance, p_method, std::forward<Args>(p_args)...);
214
}
215
216
template <typename T, typename M, typename... Args>
217
void push_and_sync(T *p_instance, M p_method, Args... p_args) {
218
// Standard command, sync.
219
using CommandType = Command<T, M, true, Args...>;
220
_push_internal<CommandType, true>(p_instance, p_method, std::forward<Args>(p_args)...);
221
}
222
223
template <typename T, typename M, typename R, typename... Args>
224
void push_and_ret(T *p_instance, M p_method, R *r_ret, Args... p_args) {
225
// Command with return value, sync.
226
using CommandType = CommandRet<T, M, R, Args...>;
227
_push_internal<CommandType, true>(p_instance, p_method, r_ret, std::forward<Args>(p_args)...);
228
}
229
230
_FORCE_INLINE_ void flush_if_pending() {
231
if (unlikely(pending.load())) {
232
_flush();
233
}
234
}
235
236
void flush_all() {
237
_flush();
238
}
239
240
void sync() {
241
push_and_sync(this, &CommandQueueMT::_no_op);
242
}
243
244
void wait_and_flush() {
245
ERR_FAIL_COND(pump_task_id == WorkerThreadPool::INVALID_TASK_ID);
246
WorkerThreadPool::get_singleton()->wait_for_task_completion(pump_task_id);
247
_flush();
248
}
249
250
void set_pump_task_id(WorkerThreadPool::TaskID p_task_id) {
251
MutexLock lock(mutex);
252
pump_task_id = p_task_id;
253
}
254
255
CommandQueueMT();
256
~CommandQueueMT();
257
};
258
259