Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
godotengine
GitHub Repository: godotengine/godot
Path: blob/master/modules/multiplayer/scene_replication_interface.cpp
20904 views
1
/**************************************************************************/
2
/* scene_replication_interface.cpp */
3
/**************************************************************************/
4
/* This file is part of: */
5
/* GODOT ENGINE */
6
/* https://godotengine.org */
7
/**************************************************************************/
8
/* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */
9
/* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */
10
/* */
11
/* Permission is hereby granted, free of charge, to any person obtaining */
12
/* a copy of this software and associated documentation files (the */
13
/* "Software"), to deal in the Software without restriction, including */
14
/* without limitation the rights to use, copy, modify, merge, publish, */
15
/* distribute, sublicense, and/or sell copies of the Software, and to */
16
/* permit persons to whom the Software is furnished to do so, subject to */
17
/* the following conditions: */
18
/* */
19
/* The above copyright notice and this permission notice shall be */
20
/* included in all copies or substantial portions of the Software. */
21
/* */
22
/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
23
/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
24
/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */
25
/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
26
/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
27
/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
28
/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
29
/**************************************************************************/
30
31
#include "scene_replication_interface.h"
32
33
#include "scene_multiplayer.h"
34
35
#include "core/debugger/engine_debugger.h"
36
#include "core/io/marshalls.h"
37
#include "core/os/os.h"
38
#include "scene/main/node.h"
39
40
#define MAKE_ROOM(m_amount) \
41
if (packet_cache.size() < m_amount) \
42
packet_cache.resize(m_amount);
43
44
#ifdef DEBUG_ENABLED
45
_FORCE_INLINE_ void SceneReplicationInterface::_profile_node_data(const String &p_what, ObjectID p_id, int p_size) {
46
if (EngineDebugger::is_profiling("multiplayer:replication")) {
47
Array values = { p_what, p_id, p_size };
48
EngineDebugger::profiler_add_frame_data("multiplayer:replication", values);
49
}
50
}
51
#endif
52
53
SceneReplicationInterface::TrackedNode &SceneReplicationInterface::_track(const ObjectID &p_id) {
54
if (!tracked_nodes.has(p_id)) {
55
tracked_nodes[p_id] = TrackedNode(p_id);
56
Node *node = get_id_as<Node>(p_id);
57
node->connect(SceneStringName(tree_exited), callable_mp(this, &SceneReplicationInterface::_untrack).bind(p_id), Node::CONNECT_ONE_SHOT);
58
}
59
return tracked_nodes[p_id];
60
}
61
62
void SceneReplicationInterface::_untrack(const ObjectID &p_id) {
63
if (!tracked_nodes.has(p_id)) {
64
return;
65
}
66
uint32_t net_id = tracked_nodes[p_id].net_id;
67
uint32_t peer = tracked_nodes[p_id].remote_peer;
68
tracked_nodes.erase(p_id);
69
// If it was spawned by a remote, remove it from the received nodes.
70
if (peer && peers_info.has(peer)) {
71
peers_info[peer].recv_nodes.erase(net_id);
72
}
73
// If we spawned or synced it, we need to remove it from any peer it was sent to.
74
if (net_id || peer == 0) {
75
for (KeyValue<int, PeerInfo> &E : peers_info) {
76
E.value.spawn_nodes.erase(p_id);
77
}
78
}
79
}
80
81
void SceneReplicationInterface::_free_remotes(const PeerInfo &p_info) {
82
for (const KeyValue<uint32_t, ObjectID> &E : p_info.recv_nodes) {
83
Node *node = tracked_nodes.has(E.value) ? get_id_as<Node>(E.value) : nullptr;
84
ERR_CONTINUE(!node);
85
node->queue_free();
86
}
87
}
88
89
bool SceneReplicationInterface::_has_authority(const Node *p_node) {
90
return multiplayer->has_multiplayer_peer() && p_node->get_multiplayer_authority() == multiplayer->get_unique_id();
91
}
92
93
void SceneReplicationInterface::on_peer_change(int p_id, bool p_connected) {
94
if (p_connected) {
95
peers_info[p_id] = PeerInfo();
96
for (const ObjectID &oid : spawned_nodes) {
97
_update_spawn_visibility(p_id, oid);
98
}
99
for (const ObjectID &oid : sync_nodes) {
100
_update_sync_visibility(p_id, get_id_as<MultiplayerSynchronizer>(oid));
101
}
102
} else {
103
ERR_FAIL_COND(!peers_info.has(p_id));
104
_free_remotes(peers_info[p_id]);
105
peers_info.erase(p_id);
106
}
107
}
108
109
void SceneReplicationInterface::on_reset() {
110
for (const KeyValue<int, PeerInfo> &E : peers_info) {
111
_free_remotes(E.value);
112
}
113
peers_info.clear();
114
// Tracked nodes are cleared on deletion, here we only reset the ids so they can be later re-assigned.
115
for (KeyValue<ObjectID, TrackedNode> &E : tracked_nodes) {
116
TrackedNode &tobj = E.value;
117
tobj.net_id = 0;
118
tobj.remote_peer = 0;
119
}
120
for (const ObjectID &oid : sync_nodes) {
121
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);
122
ERR_CONTINUE(!sync);
123
sync->reset();
124
}
125
last_net_id = 0;
126
}
127
128
void SceneReplicationInterface::on_network_process() {
129
// Prevent endless stalling in case of unforeseen spawn errors.
130
if (spawn_queue.size()) {
131
ERR_PRINT("An error happened during last spawn, this usually means the 'ready' signal was not emitted by the spawned node.");
132
for (const ObjectID &oid : spawn_queue) {
133
Node *node = get_id_as<Node>(oid);
134
ERR_CONTINUE(!node);
135
if (node->is_connected(SceneStringName(ready), callable_mp(this, &SceneReplicationInterface::_node_ready))) {
136
node->disconnect(SceneStringName(ready), callable_mp(this, &SceneReplicationInterface::_node_ready));
137
}
138
}
139
spawn_queue.clear();
140
}
141
142
// Process syncs.
143
uint64_t usec = OS::get_singleton()->get_ticks_usec();
144
for (KeyValue<int, PeerInfo> &E : peers_info) {
145
const HashSet<ObjectID> to_sync = E.value.sync_nodes;
146
if (to_sync.is_empty()) {
147
continue; // Nothing to sync
148
}
149
uint16_t sync_net_time = ++E.value.last_sent_sync;
150
_send_sync(E.key, to_sync, sync_net_time, usec);
151
_send_delta(E.key, to_sync, usec, E.value.last_watch_usecs);
152
}
153
}
154
155
Error SceneReplicationInterface::on_spawn(Object *p_obj, Variant p_config) {
156
Node *node = Object::cast_to<Node>(p_obj);
157
ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);
158
MultiplayerSpawner *spawner = Object::cast_to<MultiplayerSpawner>(p_config.get_validated_object());
159
ERR_FAIL_NULL_V(spawner, ERR_INVALID_PARAMETER);
160
// Track node.
161
const ObjectID oid = node->get_instance_id();
162
TrackedNode &tobj = _track(oid);
163
164
// Spawn state needs to be callected after "ready", but the spawn order follows "enter_tree".
165
ERR_FAIL_COND_V(tobj.spawner != ObjectID(), ERR_ALREADY_IN_USE);
166
tobj.spawner = spawner->get_instance_id();
167
spawn_queue.insert(oid);
168
node->connect(SceneStringName(ready), callable_mp(this, &SceneReplicationInterface::_node_ready).bind(oid), Node::CONNECT_ONE_SHOT);
169
return OK;
170
}
171
172
void SceneReplicationInterface::_node_ready(const ObjectID &p_oid) {
173
ERR_FAIL_COND(!spawn_queue.has(p_oid)); // Bug.
174
175
// If we are a nested spawn, we need to wait until the parent is ready.
176
if (p_oid != *(spawn_queue.begin())) {
177
return;
178
}
179
180
for (const ObjectID &oid : spawn_queue) {
181
ERR_CONTINUE(!tracked_nodes.has(oid));
182
183
TrackedNode &tobj = tracked_nodes[oid];
184
MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tobj.spawner);
185
ERR_CONTINUE(!spawner);
186
187
spawned_nodes.insert(oid);
188
if (_has_authority(spawner)) {
189
_update_spawn_visibility(0, oid);
190
}
191
}
192
spawn_queue.clear();
193
}
194
195
Error SceneReplicationInterface::on_despawn(Object *p_obj, Variant p_config) {
196
Node *node = Object::cast_to<Node>(p_obj);
197
ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);
198
MultiplayerSpawner *spawner = Object::cast_to<MultiplayerSpawner>(p_config.get_validated_object());
199
ERR_FAIL_COND_V(!p_obj || !spawner, ERR_INVALID_PARAMETER);
200
// Forcibly despawn to all peers that knowns me.
201
int len = 0;
202
Error err = _make_despawn_packet(node, len);
203
ERR_FAIL_COND_V(err != OK, ERR_BUG);
204
const ObjectID oid = p_obj->get_instance_id();
205
for (const KeyValue<int, PeerInfo> &E : peers_info) {
206
if (!E.value.spawn_nodes.has(oid)) {
207
continue;
208
}
209
_send_raw(packet_cache.ptr(), len, E.key, true);
210
}
211
// Also remove spawner tracking from the replication state.
212
ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_INVALID_PARAMETER);
213
TrackedNode &tobj = _track(oid);
214
ERR_FAIL_COND_V(tobj.spawner != spawner->get_instance_id(), ERR_INVALID_PARAMETER);
215
tobj.spawner = ObjectID();
216
spawned_nodes.erase(oid);
217
for (KeyValue<int, PeerInfo> &E : peers_info) {
218
E.value.spawn_nodes.erase(oid);
219
}
220
return OK;
221
}
222
223
Error SceneReplicationInterface::on_replication_start(Object *p_obj, Variant p_config) {
224
Node *node = Object::cast_to<Node>(p_obj);
225
ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);
226
MultiplayerSynchronizer *sync = Object::cast_to<MultiplayerSynchronizer>(p_config.get_validated_object());
227
ERR_FAIL_NULL_V(sync, ERR_INVALID_PARAMETER);
228
229
// Add to synchronizer list.
230
TrackedNode &tobj = _track(p_obj->get_instance_id());
231
const ObjectID sid = sync->get_instance_id();
232
tobj.synchronizers.insert(sid);
233
sync_nodes.insert(sid);
234
235
// Update visibility.
236
sync->connect(SceneStringName(visibility_changed), callable_mp(this, &SceneReplicationInterface::_visibility_changed).bind(sync->get_instance_id()));
237
_update_sync_visibility(0, sync);
238
239
if (pending_spawn == p_obj->get_instance_id() && sync->get_multiplayer_authority() == pending_spawn_remote) {
240
// Try to apply synchronizer Net ID
241
ERR_FAIL_COND_V_MSG(pending_sync_net_ids.is_empty(), ERR_INVALID_DATA, vformat("The MultiplayerSynchronizer at path \"%s\" is unable to process the pending spawn since it has no network ID. This might happen when changing the multiplayer authority during the \"_ready\" callback. Make sure to only change the authority of multiplayer synchronizers during the \"_enter_tree\" callback of their multiplayer spawner.", sync->get_path()));
242
ERR_FAIL_COND_V(!peers_info.has(pending_spawn_remote), ERR_INVALID_DATA);
243
uint32_t net_id = pending_sync_net_ids.front()->get();
244
pending_sync_net_ids.pop_front();
245
peers_info[pending_spawn_remote].recv_sync_ids[net_id] = sync->get_instance_id();
246
sync->set_net_id(net_id);
247
248
// Try to apply spawn state (before ready).
249
if (pending_buffer_size > 0) {
250
ERR_FAIL_COND_V(!node || !sync->get_replication_config_ptr(), ERR_UNCONFIGURED);
251
int consumed = 0;
252
const List<NodePath> props = sync->get_replication_config_ptr()->get_spawn_properties();
253
Vector<Variant> vars;
254
vars.resize(props.size());
255
Error err = MultiplayerAPI::decode_and_decompress_variants(vars, pending_buffer, pending_buffer_size, consumed);
256
ERR_FAIL_COND_V(err, err);
257
if (consumed > 0) {
258
pending_buffer += consumed;
259
pending_buffer_size -= consumed;
260
err = MultiplayerSynchronizer::set_state(props, node, vars);
261
ERR_FAIL_COND_V(err, err);
262
}
263
}
264
}
265
return OK;
266
}
267
268
Error SceneReplicationInterface::on_replication_stop(Object *p_obj, Variant p_config) {
269
Node *node = Object::cast_to<Node>(p_obj);
270
ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);
271
MultiplayerSynchronizer *sync = Object::cast_to<MultiplayerSynchronizer>(p_config.get_validated_object());
272
ERR_FAIL_NULL_V(sync, ERR_INVALID_PARAMETER);
273
sync->disconnect(SceneStringName(visibility_changed), callable_mp(this, &SceneReplicationInterface::_visibility_changed));
274
// Untrack synchronizer.
275
const ObjectID oid = node->get_instance_id();
276
const ObjectID sid = sync->get_instance_id();
277
ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_INVALID_PARAMETER);
278
TrackedNode &tobj = _track(oid);
279
tobj.synchronizers.erase(sid);
280
sync_nodes.erase(sid);
281
for (KeyValue<int, PeerInfo> &E : peers_info) {
282
E.value.sync_nodes.erase(sid);
283
E.value.last_watch_usecs.erase(sid);
284
if (sync->get_net_id()) {
285
E.value.recv_sync_ids.erase(sync->get_net_id());
286
}
287
}
288
return OK;
289
}
290
291
void SceneReplicationInterface::_visibility_changed(int p_peer, ObjectID p_sid) {
292
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(p_sid);
293
ERR_FAIL_NULL(sync); // Bug.
294
Node *node = sync->get_root_node();
295
ERR_FAIL_NULL(node); // Bug.
296
const ObjectID oid = node->get_instance_id();
297
if (spawned_nodes.has(oid) && p_peer != multiplayer->get_unique_id()) {
298
_update_spawn_visibility(p_peer, oid);
299
}
300
_update_sync_visibility(p_peer, sync);
301
}
302
303
bool SceneReplicationInterface::is_rpc_visible(const ObjectID &p_oid, int p_peer) const {
304
if (!tracked_nodes.has(p_oid)) {
305
return true; // Untracked nodes are always visible to RPCs.
306
}
307
ERR_FAIL_COND_V(p_peer < 0, false);
308
const TrackedNode &tnode = tracked_nodes[p_oid];
309
if (tnode.synchronizers.is_empty()) {
310
return true; // No synchronizers means no visibility restrictions.
311
}
312
if (tnode.remote_peer && uint32_t(p_peer) == tnode.remote_peer) {
313
return true; // RPCs on spawned nodes are always visible to spawner.
314
} else if (spawned_nodes.has(p_oid)) {
315
// It's a spawned node we control, this can be fast.
316
if (p_peer) {
317
return peers_info.has(p_peer) && peers_info[p_peer].spawn_nodes.has(p_oid);
318
} else {
319
for (const KeyValue<int, PeerInfo> &E : peers_info) {
320
if (!E.value.spawn_nodes.has(p_oid)) {
321
return false; // Not public.
322
}
323
}
324
return true; // All peers have this node.
325
}
326
} else {
327
// Cycle object synchronizers to check visibility.
328
for (const ObjectID &sid : tnode.synchronizers) {
329
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid);
330
ERR_CONTINUE(!sync);
331
// RPC visibility is composed using OR when multiple synchronizers are present.
332
// Note that we don't really care about authority here which may lead to unexpected
333
// results when using multiple synchronizers to control the same node.
334
if (sync->is_visible_to(p_peer)) {
335
return true;
336
}
337
}
338
return false; // Not visible.
339
}
340
}
341
342
Error SceneReplicationInterface::_update_sync_visibility(int p_peer, MultiplayerSynchronizer *p_sync) {
343
ERR_FAIL_NULL_V(p_sync, ERR_BUG);
344
if (!_has_authority(p_sync) || p_peer == multiplayer->get_unique_id()) {
345
return OK;
346
}
347
348
const ObjectID &sid = p_sync->get_instance_id();
349
bool is_visible = p_sync->is_visible_to(p_peer);
350
if (p_peer == 0) {
351
for (KeyValue<int, PeerInfo> &E : peers_info) {
352
// Might be visible to this specific peer.
353
bool is_visible_to_peer = is_visible || p_sync->is_visible_to(E.key);
354
if (is_visible_to_peer == E.value.sync_nodes.has(sid)) {
355
continue;
356
}
357
if (is_visible_to_peer) {
358
E.value.sync_nodes.insert(sid);
359
} else {
360
E.value.sync_nodes.erase(sid);
361
E.value.last_watch_usecs.erase(sid);
362
}
363
}
364
return OK;
365
} else {
366
ERR_FAIL_COND_V(!peers_info.has(p_peer), ERR_INVALID_PARAMETER);
367
if (is_visible == peers_info[p_peer].sync_nodes.has(sid)) {
368
return OK;
369
}
370
if (is_visible) {
371
peers_info[p_peer].sync_nodes.insert(sid);
372
} else {
373
peers_info[p_peer].sync_nodes.erase(sid);
374
peers_info[p_peer].last_watch_usecs.erase(sid);
375
}
376
return OK;
377
}
378
}
379
380
Error SceneReplicationInterface::_update_spawn_visibility(int p_peer, const ObjectID &p_oid) {
381
const TrackedNode *tnode = tracked_nodes.getptr(p_oid);
382
ERR_FAIL_NULL_V(tnode, ERR_BUG);
383
MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tnode->spawner);
384
Node *node = get_id_as<Node>(p_oid);
385
ERR_FAIL_NULL_V(node, ERR_BUG);
386
ERR_FAIL_NULL_V(spawner, ERR_BUG);
387
ERR_FAIL_COND_V(!_has_authority(spawner), ERR_BUG);
388
ERR_FAIL_COND_V(!tracked_nodes.has(p_oid), ERR_BUG);
389
const HashSet<ObjectID> synchronizers = tracked_nodes[p_oid].synchronizers;
390
bool is_visible = true;
391
for (const ObjectID &sid : synchronizers) {
392
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid);
393
ERR_CONTINUE(!sync);
394
if (!_has_authority(sync)) {
395
continue;
396
}
397
// Spawn visibility is composed using OR when multiple synchronizers are present.
398
if (sync->is_visible_to(p_peer)) {
399
is_visible = true;
400
break;
401
}
402
is_visible = false;
403
}
404
// Spawn (and despawn) when needed.
405
HashSet<int> to_spawn;
406
HashSet<int> to_despawn;
407
if (p_peer) {
408
ERR_FAIL_COND_V(!peers_info.has(p_peer), ERR_INVALID_PARAMETER);
409
if (is_visible == peers_info[p_peer].spawn_nodes.has(p_oid)) {
410
return OK;
411
}
412
if (is_visible) {
413
to_spawn.insert(p_peer);
414
} else {
415
to_despawn.insert(p_peer);
416
}
417
} else {
418
// Check visibility for each peers.
419
for (const KeyValue<int, PeerInfo> &E : peers_info) {
420
if (is_visible) {
421
// This is fast, since the object is visible to everyone, we don't need to check each peer.
422
if (E.value.spawn_nodes.has(p_oid)) {
423
// Already spawned.
424
continue;
425
}
426
to_spawn.insert(E.key);
427
} else {
428
// Need to check visibility for each peer.
429
_update_spawn_visibility(E.key, p_oid);
430
}
431
}
432
}
433
if (to_spawn.size()) {
434
int len = 0;
435
_make_spawn_packet(node, spawner, len);
436
for (int pid : to_spawn) {
437
ERR_CONTINUE(!peers_info.has(pid));
438
int path_id;
439
multiplayer_cache->send_object_cache(spawner, pid, path_id);
440
_send_raw(packet_cache.ptr(), len, pid, true);
441
peers_info[pid].spawn_nodes.insert(p_oid);
442
}
443
}
444
if (to_despawn.size()) {
445
int len = 0;
446
_make_despawn_packet(node, len);
447
for (int pid : to_despawn) {
448
ERR_CONTINUE(!peers_info.has(pid));
449
peers_info[pid].spawn_nodes.erase(p_oid);
450
_send_raw(packet_cache.ptr(), len, pid, true);
451
}
452
}
453
return OK;
454
}
455
456
Error SceneReplicationInterface::_send_raw(const uint8_t *p_buffer, int p_size, int p_peer, bool p_reliable) {
457
ERR_FAIL_COND_V(!p_buffer || p_size < 1, ERR_INVALID_PARAMETER);
458
459
Ref<MultiplayerPeer> peer = multiplayer->get_multiplayer_peer();
460
ERR_FAIL_COND_V(peer.is_null(), ERR_UNCONFIGURED);
461
peer->set_transfer_channel(0);
462
peer->set_transfer_mode(p_reliable ? MultiplayerPeer::TRANSFER_MODE_RELIABLE : MultiplayerPeer::TRANSFER_MODE_UNRELIABLE);
463
return multiplayer->send_command(p_peer, p_buffer, p_size);
464
}
465
466
Error SceneReplicationInterface::_make_spawn_packet(Node *p_node, MultiplayerSpawner *p_spawner, int &r_len) {
467
ERR_FAIL_COND_V(!multiplayer || !p_node || !p_spawner, ERR_BUG);
468
469
const ObjectID oid = p_node->get_instance_id();
470
TrackedNode *tnode = tracked_nodes.getptr(oid);
471
ERR_FAIL_NULL_V(tnode, ERR_INVALID_PARAMETER);
472
473
if (tnode->net_id == 0) {
474
// Ensure the node has an ID.
475
tnode->net_id = ++last_net_id;
476
}
477
uint32_t nid = tnode->net_id;
478
ERR_FAIL_COND_V(!nid, ERR_UNCONFIGURED);
479
480
// Prepare custom arg and scene_id
481
uint8_t scene_id = p_spawner->find_spawnable_scene_index_from_object(oid);
482
bool is_custom = scene_id == MultiplayerSpawner::INVALID_ID;
483
Variant spawn_arg = p_spawner->get_spawn_argument(oid);
484
int spawn_arg_size = 0;
485
if (is_custom) {
486
Error err = MultiplayerAPI::encode_and_compress_variant(spawn_arg, nullptr, spawn_arg_size, false);
487
ERR_FAIL_COND_V(err, err);
488
}
489
490
// Prepare spawn state.
491
List<NodePath> state_props;
492
List<uint32_t> sync_ids;
493
const HashSet<ObjectID> synchronizers = tnode->synchronizers;
494
for (const ObjectID &sid : synchronizers) {
495
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid);
496
if (!_has_authority(sync)) {
497
continue;
498
}
499
ERR_CONTINUE(!sync);
500
ERR_FAIL_NULL_V(sync->get_replication_config_ptr(), ERR_BUG);
501
for (const NodePath &prop : sync->get_replication_config_ptr()->get_spawn_properties()) {
502
state_props.push_back(prop);
503
}
504
// Ensure the synchronizer has an ID.
505
if (sync->get_net_id() == 0) {
506
sync->set_net_id(++last_net_id);
507
}
508
sync_ids.push_back(sync->get_net_id());
509
}
510
int state_size = 0;
511
Vector<Variant> state_vars;
512
Vector<const Variant *> state_varp;
513
if (state_props.size()) {
514
Error err = MultiplayerSynchronizer::get_state(state_props, p_node, state_vars, state_varp);
515
ERR_FAIL_COND_V_MSG(err != OK, err, "Unable to retrieve spawn state.");
516
err = MultiplayerAPI::encode_and_compress_variants(state_varp.ptrw(), state_varp.size(), nullptr, state_size);
517
ERR_FAIL_COND_V_MSG(err != OK, err, "Unable to encode spawn state.");
518
}
519
520
// Encode scene ID, path ID, net ID, node name.
521
int path_id = multiplayer_cache->make_object_cache(p_spawner);
522
CharString cname = p_node->get_name().operator String().utf8();
523
int nlen = encode_cstring(cname.get_data(), nullptr);
524
MAKE_ROOM(1 + 1 + 4 + 4 + 4 + 4 * sync_ids.size() + 4 + nlen + (is_custom ? 4 + spawn_arg_size : 0) + state_size);
525
uint8_t *ptr = packet_cache.ptrw();
526
ptr[0] = (uint8_t)SceneMultiplayer::NETWORK_COMMAND_SPAWN;
527
ptr[1] = scene_id;
528
int ofs = 2;
529
ofs += encode_uint32(path_id, &ptr[ofs]);
530
ofs += encode_uint32(nid, &ptr[ofs]);
531
ofs += encode_uint32(sync_ids.size(), &ptr[ofs]);
532
ofs += encode_uint32(nlen, &ptr[ofs]);
533
for (uint32_t snid : sync_ids) {
534
ofs += encode_uint32(snid, &ptr[ofs]);
535
}
536
ofs += encode_cstring(cname.get_data(), &ptr[ofs]);
537
// Write args
538
if (is_custom) {
539
ofs += encode_uint32(spawn_arg_size, &ptr[ofs]);
540
Error err = MultiplayerAPI::encode_and_compress_variant(spawn_arg, &ptr[ofs], spawn_arg_size, false);
541
ERR_FAIL_COND_V(err, err);
542
ofs += spawn_arg_size;
543
}
544
// Write state.
545
if (state_size) {
546
Error err = MultiplayerAPI::encode_and_compress_variants(state_varp.ptrw(), state_varp.size(), &ptr[ofs], state_size);
547
ERR_FAIL_COND_V(err, err);
548
ofs += state_size;
549
}
550
r_len = ofs;
551
return OK;
552
}
553
554
Error SceneReplicationInterface::_make_despawn_packet(Node *p_node, int &r_len) {
555
const ObjectID oid = p_node->get_instance_id();
556
const TrackedNode *tnode = tracked_nodes.getptr(oid);
557
ERR_FAIL_NULL_V(tnode, ERR_INVALID_PARAMETER);
558
MAKE_ROOM(5);
559
uint8_t *ptr = packet_cache.ptrw();
560
ptr[0] = (uint8_t)SceneMultiplayer::NETWORK_COMMAND_DESPAWN;
561
int ofs = 1;
562
uint32_t nid = tnode->net_id;
563
ofs += encode_uint32(nid, &ptr[ofs]);
564
r_len = ofs;
565
return OK;
566
}
567
568
Error SceneReplicationInterface::on_spawn_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
569
ERR_FAIL_COND_V_MSG(p_buffer_len < 18, ERR_INVALID_DATA, "Invalid spawn packet received");
570
int ofs = 1; // The spawn/despawn command.
571
uint8_t scene_id = p_buffer[ofs];
572
ofs += 1;
573
uint32_t node_target = decode_uint32(&p_buffer[ofs]);
574
ofs += 4;
575
MultiplayerSpawner *spawner = Object::cast_to<MultiplayerSpawner>(multiplayer_cache->get_cached_object(p_from, node_target));
576
ERR_FAIL_NULL_V(spawner, ERR_DOES_NOT_EXIST);
577
ERR_FAIL_COND_V(p_from != spawner->get_multiplayer_authority(), ERR_UNAUTHORIZED);
578
579
uint32_t net_id = decode_uint32(&p_buffer[ofs]);
580
ofs += 4;
581
uint32_t sync_len = decode_uint32(&p_buffer[ofs]);
582
ofs += 4;
583
uint32_t name_len = decode_uint32(&p_buffer[ofs]);
584
ofs += 4;
585
ERR_FAIL_COND_V_MSG(name_len + (sync_len * 4) > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA, vformat("Invalid spawn packet size: %d, wants: %d", p_buffer_len, ofs + name_len + (sync_len * 4)));
586
List<uint32_t> sync_ids;
587
for (uint32_t i = 0; i < sync_len; i++) {
588
sync_ids.push_back(decode_uint32(&p_buffer[ofs]));
589
ofs += 4;
590
}
591
ERR_FAIL_COND_V_MSG(name_len < 1, ERR_INVALID_DATA, "Zero spawn name size.");
592
593
// We need to make sure no trickery happens here, but we want to allow autogenerated ("@") node names.
594
const String name = String::utf8((const char *)&p_buffer[ofs], name_len);
595
ERR_FAIL_COND_V_MSG(name.validate_node_name() != name, ERR_INVALID_DATA, vformat("Invalid node name received: '%s'. Make sure to add nodes via 'add_child(node, true)' remotely.", name));
596
ofs += name_len;
597
598
// Check that we can spawn.
599
Node *parent = spawner->get_node_or_null(spawner->get_spawn_path());
600
ERR_FAIL_NULL_V(parent, ERR_UNCONFIGURED);
601
ERR_FAIL_COND_V(parent->has_node(name), ERR_INVALID_DATA);
602
603
Node *node = nullptr;
604
if (scene_id == MultiplayerSpawner::INVALID_ID) {
605
// Custom spawn.
606
ERR_FAIL_COND_V(p_buffer_len - ofs < 4, ERR_INVALID_DATA);
607
uint32_t arg_size = decode_uint32(&p_buffer[ofs]);
608
ofs += 4;
609
ERR_FAIL_COND_V(arg_size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);
610
Variant v;
611
Error err = MultiplayerAPI::decode_and_decompress_variant(v, &p_buffer[ofs], arg_size, nullptr, false);
612
ERR_FAIL_COND_V(err != OK, err);
613
ofs += arg_size;
614
node = spawner->instantiate_custom(v);
615
} else {
616
// Scene based spawn.
617
node = spawner->instantiate_scene(scene_id);
618
}
619
ERR_FAIL_NULL_V(node, ERR_UNAUTHORIZED);
620
node->set_name(name);
621
622
// Add and track remote
623
ERR_FAIL_COND_V(!peers_info.has(p_from), ERR_UNAVAILABLE);
624
ERR_FAIL_COND_V(peers_info[p_from].recv_nodes.has(net_id), ERR_ALREADY_IN_USE);
625
ObjectID oid = node->get_instance_id();
626
TrackedNode &tobj = _track(oid);
627
tobj.spawner = spawner->get_instance_id();
628
tobj.net_id = net_id;
629
tobj.remote_peer = p_from;
630
peers_info[p_from].recv_nodes[net_id] = oid;
631
632
// The initial state will be applied during the sync config (i.e. before _ready).
633
pending_spawn = node->get_instance_id();
634
pending_spawn_remote = p_from;
635
pending_buffer_size = p_buffer_len - ofs;
636
pending_buffer = pending_buffer_size > 0 ? &p_buffer[ofs] : nullptr;
637
pending_sync_net_ids = sync_ids;
638
639
parent->add_child(node);
640
spawner->emit_signal(SNAME("spawned"), node);
641
642
pending_spawn = ObjectID();
643
pending_spawn_remote = 0;
644
pending_buffer = nullptr;
645
pending_buffer_size = 0;
646
if (pending_sync_net_ids.size()) {
647
pending_sync_net_ids.clear();
648
ERR_FAIL_V(ERR_INVALID_DATA); // Should have been consumed.
649
}
650
return OK;
651
}
652
653
Error SceneReplicationInterface::on_despawn_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
654
ERR_FAIL_COND_V_MSG(p_buffer_len < 5, ERR_INVALID_DATA, "Invalid spawn packet received");
655
int ofs = 1; // The spawn/despawn command.
656
uint32_t net_id = decode_uint32(&p_buffer[ofs]);
657
ofs += 4;
658
659
// Untrack remote
660
ERR_FAIL_COND_V(!peers_info.has(p_from), ERR_UNAUTHORIZED);
661
PeerInfo &pinfo = peers_info[p_from];
662
ERR_FAIL_COND_V(!pinfo.recv_nodes.has(net_id), ERR_UNAUTHORIZED);
663
Node *node = get_id_as<Node>(pinfo.recv_nodes[net_id]);
664
ERR_FAIL_NULL_V(node, ERR_BUG);
665
pinfo.recv_nodes.erase(net_id);
666
667
const ObjectID oid = node->get_instance_id();
668
ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_BUG);
669
MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tracked_nodes[oid].spawner);
670
ERR_FAIL_NULL_V(spawner, ERR_DOES_NOT_EXIST);
671
ERR_FAIL_COND_V(p_from != spawner->get_multiplayer_authority(), ERR_UNAUTHORIZED);
672
673
if (node->get_parent() != nullptr) {
674
node->get_parent()->remove_child(node);
675
}
676
node->queue_free();
677
spawner->emit_signal(SNAME("despawned"), node);
678
679
return OK;
680
}
681
682
bool SceneReplicationInterface::_verify_synchronizer(int p_peer, MultiplayerSynchronizer *p_sync, uint32_t &r_net_id) {
683
r_net_id = p_sync->get_net_id();
684
if (r_net_id == 0 || (r_net_id & 0x80000000)) {
685
int path_id = 0;
686
bool verified = multiplayer_cache->send_object_cache(p_sync, p_peer, path_id);
687
ERR_FAIL_COND_V_MSG(path_id < 0, false, "This should never happen!");
688
if (r_net_id == 0) {
689
// First time path based ID.
690
r_net_id = path_id | 0x80000000;
691
p_sync->set_net_id(r_net_id | 0x80000000);
692
}
693
return verified;
694
}
695
return true;
696
}
697
698
MultiplayerSynchronizer *SceneReplicationInterface::_find_synchronizer(int p_peer, uint32_t p_net_id) {
699
MultiplayerSynchronizer *sync = nullptr;
700
if (p_net_id & 0x80000000) {
701
sync = Object::cast_to<MultiplayerSynchronizer>(multiplayer_cache->get_cached_object(p_peer, p_net_id & 0x7FFFFFFF));
702
} else if (peers_info[p_peer].recv_sync_ids.has(p_net_id)) {
703
const ObjectID &sid = peers_info[p_peer].recv_sync_ids[p_net_id];
704
sync = get_id_as<MultiplayerSynchronizer>(sid);
705
}
706
return sync;
707
}
708
709
void SceneReplicationInterface::_send_delta(int p_peer, const HashSet<ObjectID> &p_synchronizers, uint64_t p_usec, const HashMap<ObjectID, uint64_t> &p_last_watch_usecs) {
710
MAKE_ROOM(/* header */ 1 + /* element */ 4 + 8 + 4 + delta_mtu);
711
uint8_t *ptr = packet_cache.ptrw();
712
ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC | (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT);
713
int ofs = 1;
714
for (const ObjectID &oid : p_synchronizers) {
715
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);
716
ERR_CONTINUE(!sync || !sync->get_replication_config_ptr() || !_has_authority(sync));
717
uint32_t net_id;
718
if (!_verify_synchronizer(p_peer, sync, net_id)) {
719
continue;
720
}
721
uint64_t last_usec = p_last_watch_usecs.has(oid) ? p_last_watch_usecs[oid] : 0;
722
uint64_t indexes;
723
List<Variant> delta = sync->get_delta_state(p_usec, last_usec, indexes);
724
725
if (!delta.size()) {
726
continue; // Nothing to update.
727
}
728
729
Vector<const Variant *> varp;
730
varp.resize(delta.size());
731
const Variant **vptr = varp.ptrw();
732
int i = 0;
733
for (const Variant &v : delta) {
734
vptr[i] = &v;
735
i++;
736
}
737
int size;
738
Error err = MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), nullptr, size);
739
ERR_CONTINUE_MSG(err != OK, "Unable to encode delta state.");
740
741
ERR_CONTINUE_MSG(size > delta_mtu, vformat("Synchronizer delta bigger than MTU will not be sent (%d > %d): %s", size, delta_mtu, sync->get_path()));
742
743
if (ofs + 4 + 8 + 4 + size > delta_mtu) {
744
// Send what we got, and reset write.
745
_send_raw(packet_cache.ptr(), ofs, p_peer, true);
746
ofs = 1;
747
}
748
if (size) {
749
ofs += encode_uint32(sync->get_net_id(), &ptr[ofs]);
750
ofs += encode_uint64(indexes, &ptr[ofs]);
751
ofs += encode_uint32(size, &ptr[ofs]);
752
MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), &ptr[ofs], size);
753
ofs += size;
754
}
755
#ifdef DEBUG_ENABLED
756
_profile_node_data("delta_out", oid, size);
757
#endif
758
peers_info[p_peer].last_watch_usecs[oid] = p_usec;
759
}
760
if (ofs > 1) {
761
// Got some left over to send.
762
_send_raw(packet_cache.ptr(), ofs, p_peer, true);
763
}
764
}
765
766
Error SceneReplicationInterface::on_delta_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
767
int ofs = 1;
768
while (ofs + 4 + 8 + 4 < p_buffer_len) {
769
uint32_t net_id = decode_uint32(&p_buffer[ofs]);
770
ofs += 4;
771
uint64_t indexes = decode_uint64(&p_buffer[ofs]);
772
ofs += 8;
773
uint32_t size = decode_uint32(&p_buffer[ofs]);
774
ofs += 4;
775
ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);
776
MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id);
777
Node *node = sync ? sync->get_root_node() : nullptr;
778
if (!sync || sync->get_multiplayer_authority() != p_from || !node) {
779
ofs += size;
780
ERR_CONTINUE_MSG(true, "Ignoring delta for non-authority or invalid synchronizer.");
781
}
782
List<NodePath> props = sync->get_delta_properties(indexes);
783
ERR_FAIL_COND_V(props.is_empty(), ERR_INVALID_DATA);
784
Vector<Variant> vars;
785
vars.resize(props.size());
786
int consumed = 0;
787
Error err = MultiplayerAPI::decode_and_decompress_variants(vars, p_buffer + ofs, size, consumed);
788
ERR_FAIL_COND_V(err != OK, err);
789
ERR_FAIL_COND_V(uint32_t(consumed) != size, ERR_INVALID_DATA);
790
err = MultiplayerSynchronizer::set_state(props, node, vars);
791
ERR_FAIL_COND_V(err != OK, err);
792
ofs += size;
793
sync->emit_signal(SNAME("delta_synchronized"));
794
#ifdef DEBUG_ENABLED
795
_profile_node_data("delta_in", sync->get_instance_id(), size);
796
#endif
797
}
798
return OK;
799
}
800
801
void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> &p_synchronizers, uint16_t p_sync_net_time, uint64_t p_usec) {
802
MAKE_ROOM(/* header */ 3 + /* element */ 4 + 4 + sync_mtu);
803
uint8_t *ptr = packet_cache.ptrw();
804
ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC;
805
int ofs = 1;
806
ofs += encode_uint16(p_sync_net_time, &ptr[1]);
807
// Can only send updates for already notified nodes.
808
// This is a lazy implementation, we could optimize much more here with by grouping by replication config.
809
for (const ObjectID &oid : p_synchronizers) {
810
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);
811
ERR_CONTINUE(!sync || !sync->get_replication_config_ptr() || !_has_authority(sync));
812
if (!sync->update_outbound_sync_time(p_usec)) {
813
continue; // nothing to sync.
814
}
815
816
Node *node = sync->get_root_node();
817
ERR_CONTINUE(!node);
818
uint32_t net_id = sync->get_net_id();
819
if (!_verify_synchronizer(p_peer, sync, net_id)) {
820
// The path based sync is not yet confirmed, skipping.
821
continue;
822
}
823
int size;
824
Vector<Variant> vars;
825
Vector<const Variant *> varp;
826
const List<NodePath> props = sync->get_replication_config_ptr()->get_sync_properties();
827
Error err = MultiplayerSynchronizer::get_state(props, node, vars, varp);
828
ERR_CONTINUE_MSG(err != OK, "Unable to retrieve sync state.");
829
err = MultiplayerAPI::encode_and_compress_variants(varp.ptrw(), varp.size(), nullptr, size);
830
ERR_CONTINUE_MSG(err != OK, "Unable to encode sync state.");
831
// TODO Handle single state above MTU.
832
ERR_CONTINUE_MSG(size > sync_mtu, vformat("Node states bigger than MTU will not be sent (%d > %d): %s", size, sync_mtu, node->get_path()));
833
if (ofs + 4 + 4 + size > sync_mtu) {
834
// Send what we got, and reset write.
835
_send_raw(packet_cache.ptr(), ofs, p_peer, false);
836
ofs = 3;
837
}
838
if (size) {
839
ofs += encode_uint32(sync->get_net_id(), &ptr[ofs]);
840
ofs += encode_uint32(size, &ptr[ofs]);
841
MultiplayerAPI::encode_and_compress_variants(varp.ptrw(), varp.size(), &ptr[ofs], size);
842
ofs += size;
843
}
844
#ifdef DEBUG_ENABLED
845
_profile_node_data("sync_out", oid, size);
846
#endif
847
}
848
if (ofs > 3) {
849
// Got some left over to send.
850
_send_raw(packet_cache.ptr(), ofs, p_peer, false);
851
}
852
}
853
854
Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
855
ERR_FAIL_COND_V_MSG(p_buffer_len < 11, ERR_INVALID_DATA, "Invalid sync packet received");
856
bool is_delta = (p_buffer[0] & (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT)) != 0;
857
if (is_delta) {
858
return on_delta_receive(p_from, p_buffer, p_buffer_len);
859
}
860
uint16_t time = decode_uint16(&p_buffer[1]);
861
int ofs = 3;
862
while (ofs + 8 < p_buffer_len) {
863
uint32_t net_id = decode_uint32(&p_buffer[ofs]);
864
ofs += 4;
865
uint32_t size = decode_uint32(&p_buffer[ofs]);
866
ofs += 4;
867
ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);
868
MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id);
869
if (!sync) {
870
// Not received yet.
871
ofs += size;
872
continue;
873
}
874
Node *node = sync->get_root_node();
875
if (sync->get_multiplayer_authority() != p_from || !node) {
876
// Not valid for me.
877
ofs += size;
878
ERR_CONTINUE_MSG(true, "Ignoring sync data from non-authority or for missing node.");
879
}
880
if (!sync->update_inbound_sync_time(time)) {
881
// State is too old.
882
ofs += size;
883
continue;
884
}
885
const List<NodePath> props = sync->get_replication_config_ptr()->get_sync_properties();
886
Vector<Variant> vars;
887
vars.resize(props.size());
888
int consumed;
889
Error err = MultiplayerAPI::decode_and_decompress_variants(vars, &p_buffer[ofs], size, consumed);
890
ERR_FAIL_COND_V(err, err);
891
err = MultiplayerSynchronizer::set_state(props, node, vars);
892
ERR_FAIL_COND_V(err, err);
893
ofs += size;
894
sync->emit_signal(SNAME("synchronized"));
895
#ifdef DEBUG_ENABLED
896
_profile_node_data("sync_in", sync->get_instance_id(), size);
897
#endif
898
}
899
return OK;
900
}
901
902
void SceneReplicationInterface::set_max_sync_packet_size(int p_size) {
903
ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes.");
904
sync_mtu = p_size;
905
}
906
907
int SceneReplicationInterface::get_max_sync_packet_size() const {
908
return sync_mtu;
909
}
910
911
void SceneReplicationInterface::set_max_delta_packet_size(int p_size) {
912
ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes.");
913
delta_mtu = p_size;
914
}
915
916
int SceneReplicationInterface::get_max_delta_packet_size() const {
917
return delta_mtu;
918
}
919
920