Path: blob/master/modules/multiplayer/scene_replication_interface.cpp
20904 views
/**************************************************************************/1/* scene_replication_interface.cpp */2/**************************************************************************/3/* This file is part of: */4/* GODOT ENGINE */5/* https://godotengine.org */6/**************************************************************************/7/* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */8/* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */9/* */10/* Permission is hereby granted, free of charge, to any person obtaining */11/* a copy of this software and associated documentation files (the */12/* "Software"), to deal in the Software without restriction, including */13/* without limitation the rights to use, copy, modify, merge, publish, */14/* distribute, sublicense, and/or sell copies of the Software, and to */15/* permit persons to whom the Software is furnished to do so, subject to */16/* the following conditions: */17/* */18/* The above copyright notice and this permission notice shall be */19/* included in all copies or substantial portions of the Software. */20/* */21/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */22/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */23/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */24/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */25/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */26/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */27/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */28/**************************************************************************/2930#include "scene_replication_interface.h"3132#include "scene_multiplayer.h"3334#include "core/debugger/engine_debugger.h"35#include "core/io/marshalls.h"36#include "core/os/os.h"37#include "scene/main/node.h"3839#define MAKE_ROOM(m_amount) \40if (packet_cache.size() < m_amount) \41packet_cache.resize(m_amount);4243#ifdef DEBUG_ENABLED44_FORCE_INLINE_ void SceneReplicationInterface::_profile_node_data(const String &p_what, ObjectID p_id, int p_size) {45if (EngineDebugger::is_profiling("multiplayer:replication")) {46Array values = { p_what, p_id, p_size };47EngineDebugger::profiler_add_frame_data("multiplayer:replication", values);48}49}50#endif5152SceneReplicationInterface::TrackedNode &SceneReplicationInterface::_track(const ObjectID &p_id) {53if (!tracked_nodes.has(p_id)) {54tracked_nodes[p_id] = TrackedNode(p_id);55Node *node = get_id_as<Node>(p_id);56node->connect(SceneStringName(tree_exited), callable_mp(this, &SceneReplicationInterface::_untrack).bind(p_id), Node::CONNECT_ONE_SHOT);57}58return tracked_nodes[p_id];59}6061void SceneReplicationInterface::_untrack(const ObjectID &p_id) {62if (!tracked_nodes.has(p_id)) {63return;64}65uint32_t net_id = tracked_nodes[p_id].net_id;66uint32_t peer = tracked_nodes[p_id].remote_peer;67tracked_nodes.erase(p_id);68// If it was spawned by a remote, remove it from the received nodes.69if (peer && peers_info.has(peer)) {70peers_info[peer].recv_nodes.erase(net_id);71}72// If we spawned or synced it, we need to remove it from any peer it was sent to.73if (net_id || peer == 0) {74for (KeyValue<int, PeerInfo> &E : peers_info) {75E.value.spawn_nodes.erase(p_id);76}77}78}7980void SceneReplicationInterface::_free_remotes(const PeerInfo &p_info) {81for (const KeyValue<uint32_t, ObjectID> &E : p_info.recv_nodes) {82Node *node = tracked_nodes.has(E.value) ? get_id_as<Node>(E.value) : nullptr;83ERR_CONTINUE(!node);84node->queue_free();85}86}8788bool SceneReplicationInterface::_has_authority(const Node *p_node) {89return multiplayer->has_multiplayer_peer() && p_node->get_multiplayer_authority() == multiplayer->get_unique_id();90}9192void SceneReplicationInterface::on_peer_change(int p_id, bool p_connected) {93if (p_connected) {94peers_info[p_id] = PeerInfo();95for (const ObjectID &oid : spawned_nodes) {96_update_spawn_visibility(p_id, oid);97}98for (const ObjectID &oid : sync_nodes) {99_update_sync_visibility(p_id, get_id_as<MultiplayerSynchronizer>(oid));100}101} else {102ERR_FAIL_COND(!peers_info.has(p_id));103_free_remotes(peers_info[p_id]);104peers_info.erase(p_id);105}106}107108void SceneReplicationInterface::on_reset() {109for (const KeyValue<int, PeerInfo> &E : peers_info) {110_free_remotes(E.value);111}112peers_info.clear();113// Tracked nodes are cleared on deletion, here we only reset the ids so they can be later re-assigned.114for (KeyValue<ObjectID, TrackedNode> &E : tracked_nodes) {115TrackedNode &tobj = E.value;116tobj.net_id = 0;117tobj.remote_peer = 0;118}119for (const ObjectID &oid : sync_nodes) {120MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);121ERR_CONTINUE(!sync);122sync->reset();123}124last_net_id = 0;125}126127void SceneReplicationInterface::on_network_process() {128// Prevent endless stalling in case of unforeseen spawn errors.129if (spawn_queue.size()) {130ERR_PRINT("An error happened during last spawn, this usually means the 'ready' signal was not emitted by the spawned node.");131for (const ObjectID &oid : spawn_queue) {132Node *node = get_id_as<Node>(oid);133ERR_CONTINUE(!node);134if (node->is_connected(SceneStringName(ready), callable_mp(this, &SceneReplicationInterface::_node_ready))) {135node->disconnect(SceneStringName(ready), callable_mp(this, &SceneReplicationInterface::_node_ready));136}137}138spawn_queue.clear();139}140141// Process syncs.142uint64_t usec = OS::get_singleton()->get_ticks_usec();143for (KeyValue<int, PeerInfo> &E : peers_info) {144const HashSet<ObjectID> to_sync = E.value.sync_nodes;145if (to_sync.is_empty()) {146continue; // Nothing to sync147}148uint16_t sync_net_time = ++E.value.last_sent_sync;149_send_sync(E.key, to_sync, sync_net_time, usec);150_send_delta(E.key, to_sync, usec, E.value.last_watch_usecs);151}152}153154Error SceneReplicationInterface::on_spawn(Object *p_obj, Variant p_config) {155Node *node = Object::cast_to<Node>(p_obj);156ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);157MultiplayerSpawner *spawner = Object::cast_to<MultiplayerSpawner>(p_config.get_validated_object());158ERR_FAIL_NULL_V(spawner, ERR_INVALID_PARAMETER);159// Track node.160const ObjectID oid = node->get_instance_id();161TrackedNode &tobj = _track(oid);162163// Spawn state needs to be callected after "ready", but the spawn order follows "enter_tree".164ERR_FAIL_COND_V(tobj.spawner != ObjectID(), ERR_ALREADY_IN_USE);165tobj.spawner = spawner->get_instance_id();166spawn_queue.insert(oid);167node->connect(SceneStringName(ready), callable_mp(this, &SceneReplicationInterface::_node_ready).bind(oid), Node::CONNECT_ONE_SHOT);168return OK;169}170171void SceneReplicationInterface::_node_ready(const ObjectID &p_oid) {172ERR_FAIL_COND(!spawn_queue.has(p_oid)); // Bug.173174// If we are a nested spawn, we need to wait until the parent is ready.175if (p_oid != *(spawn_queue.begin())) {176return;177}178179for (const ObjectID &oid : spawn_queue) {180ERR_CONTINUE(!tracked_nodes.has(oid));181182TrackedNode &tobj = tracked_nodes[oid];183MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tobj.spawner);184ERR_CONTINUE(!spawner);185186spawned_nodes.insert(oid);187if (_has_authority(spawner)) {188_update_spawn_visibility(0, oid);189}190}191spawn_queue.clear();192}193194Error SceneReplicationInterface::on_despawn(Object *p_obj, Variant p_config) {195Node *node = Object::cast_to<Node>(p_obj);196ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);197MultiplayerSpawner *spawner = Object::cast_to<MultiplayerSpawner>(p_config.get_validated_object());198ERR_FAIL_COND_V(!p_obj || !spawner, ERR_INVALID_PARAMETER);199// Forcibly despawn to all peers that knowns me.200int len = 0;201Error err = _make_despawn_packet(node, len);202ERR_FAIL_COND_V(err != OK, ERR_BUG);203const ObjectID oid = p_obj->get_instance_id();204for (const KeyValue<int, PeerInfo> &E : peers_info) {205if (!E.value.spawn_nodes.has(oid)) {206continue;207}208_send_raw(packet_cache.ptr(), len, E.key, true);209}210// Also remove spawner tracking from the replication state.211ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_INVALID_PARAMETER);212TrackedNode &tobj = _track(oid);213ERR_FAIL_COND_V(tobj.spawner != spawner->get_instance_id(), ERR_INVALID_PARAMETER);214tobj.spawner = ObjectID();215spawned_nodes.erase(oid);216for (KeyValue<int, PeerInfo> &E : peers_info) {217E.value.spawn_nodes.erase(oid);218}219return OK;220}221222Error SceneReplicationInterface::on_replication_start(Object *p_obj, Variant p_config) {223Node *node = Object::cast_to<Node>(p_obj);224ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);225MultiplayerSynchronizer *sync = Object::cast_to<MultiplayerSynchronizer>(p_config.get_validated_object());226ERR_FAIL_NULL_V(sync, ERR_INVALID_PARAMETER);227228// Add to synchronizer list.229TrackedNode &tobj = _track(p_obj->get_instance_id());230const ObjectID sid = sync->get_instance_id();231tobj.synchronizers.insert(sid);232sync_nodes.insert(sid);233234// Update visibility.235sync->connect(SceneStringName(visibility_changed), callable_mp(this, &SceneReplicationInterface::_visibility_changed).bind(sync->get_instance_id()));236_update_sync_visibility(0, sync);237238if (pending_spawn == p_obj->get_instance_id() && sync->get_multiplayer_authority() == pending_spawn_remote) {239// Try to apply synchronizer Net ID240ERR_FAIL_COND_V_MSG(pending_sync_net_ids.is_empty(), ERR_INVALID_DATA, vformat("The MultiplayerSynchronizer at path \"%s\" is unable to process the pending spawn since it has no network ID. This might happen when changing the multiplayer authority during the \"_ready\" callback. Make sure to only change the authority of multiplayer synchronizers during the \"_enter_tree\" callback of their multiplayer spawner.", sync->get_path()));241ERR_FAIL_COND_V(!peers_info.has(pending_spawn_remote), ERR_INVALID_DATA);242uint32_t net_id = pending_sync_net_ids.front()->get();243pending_sync_net_ids.pop_front();244peers_info[pending_spawn_remote].recv_sync_ids[net_id] = sync->get_instance_id();245sync->set_net_id(net_id);246247// Try to apply spawn state (before ready).248if (pending_buffer_size > 0) {249ERR_FAIL_COND_V(!node || !sync->get_replication_config_ptr(), ERR_UNCONFIGURED);250int consumed = 0;251const List<NodePath> props = sync->get_replication_config_ptr()->get_spawn_properties();252Vector<Variant> vars;253vars.resize(props.size());254Error err = MultiplayerAPI::decode_and_decompress_variants(vars, pending_buffer, pending_buffer_size, consumed);255ERR_FAIL_COND_V(err, err);256if (consumed > 0) {257pending_buffer += consumed;258pending_buffer_size -= consumed;259err = MultiplayerSynchronizer::set_state(props, node, vars);260ERR_FAIL_COND_V(err, err);261}262}263}264return OK;265}266267Error SceneReplicationInterface::on_replication_stop(Object *p_obj, Variant p_config) {268Node *node = Object::cast_to<Node>(p_obj);269ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);270MultiplayerSynchronizer *sync = Object::cast_to<MultiplayerSynchronizer>(p_config.get_validated_object());271ERR_FAIL_NULL_V(sync, ERR_INVALID_PARAMETER);272sync->disconnect(SceneStringName(visibility_changed), callable_mp(this, &SceneReplicationInterface::_visibility_changed));273// Untrack synchronizer.274const ObjectID oid = node->get_instance_id();275const ObjectID sid = sync->get_instance_id();276ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_INVALID_PARAMETER);277TrackedNode &tobj = _track(oid);278tobj.synchronizers.erase(sid);279sync_nodes.erase(sid);280for (KeyValue<int, PeerInfo> &E : peers_info) {281E.value.sync_nodes.erase(sid);282E.value.last_watch_usecs.erase(sid);283if (sync->get_net_id()) {284E.value.recv_sync_ids.erase(sync->get_net_id());285}286}287return OK;288}289290void SceneReplicationInterface::_visibility_changed(int p_peer, ObjectID p_sid) {291MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(p_sid);292ERR_FAIL_NULL(sync); // Bug.293Node *node = sync->get_root_node();294ERR_FAIL_NULL(node); // Bug.295const ObjectID oid = node->get_instance_id();296if (spawned_nodes.has(oid) && p_peer != multiplayer->get_unique_id()) {297_update_spawn_visibility(p_peer, oid);298}299_update_sync_visibility(p_peer, sync);300}301302bool SceneReplicationInterface::is_rpc_visible(const ObjectID &p_oid, int p_peer) const {303if (!tracked_nodes.has(p_oid)) {304return true; // Untracked nodes are always visible to RPCs.305}306ERR_FAIL_COND_V(p_peer < 0, false);307const TrackedNode &tnode = tracked_nodes[p_oid];308if (tnode.synchronizers.is_empty()) {309return true; // No synchronizers means no visibility restrictions.310}311if (tnode.remote_peer && uint32_t(p_peer) == tnode.remote_peer) {312return true; // RPCs on spawned nodes are always visible to spawner.313} else if (spawned_nodes.has(p_oid)) {314// It's a spawned node we control, this can be fast.315if (p_peer) {316return peers_info.has(p_peer) && peers_info[p_peer].spawn_nodes.has(p_oid);317} else {318for (const KeyValue<int, PeerInfo> &E : peers_info) {319if (!E.value.spawn_nodes.has(p_oid)) {320return false; // Not public.321}322}323return true; // All peers have this node.324}325} else {326// Cycle object synchronizers to check visibility.327for (const ObjectID &sid : tnode.synchronizers) {328MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid);329ERR_CONTINUE(!sync);330// RPC visibility is composed using OR when multiple synchronizers are present.331// Note that we don't really care about authority here which may lead to unexpected332// results when using multiple synchronizers to control the same node.333if (sync->is_visible_to(p_peer)) {334return true;335}336}337return false; // Not visible.338}339}340341Error SceneReplicationInterface::_update_sync_visibility(int p_peer, MultiplayerSynchronizer *p_sync) {342ERR_FAIL_NULL_V(p_sync, ERR_BUG);343if (!_has_authority(p_sync) || p_peer == multiplayer->get_unique_id()) {344return OK;345}346347const ObjectID &sid = p_sync->get_instance_id();348bool is_visible = p_sync->is_visible_to(p_peer);349if (p_peer == 0) {350for (KeyValue<int, PeerInfo> &E : peers_info) {351// Might be visible to this specific peer.352bool is_visible_to_peer = is_visible || p_sync->is_visible_to(E.key);353if (is_visible_to_peer == E.value.sync_nodes.has(sid)) {354continue;355}356if (is_visible_to_peer) {357E.value.sync_nodes.insert(sid);358} else {359E.value.sync_nodes.erase(sid);360E.value.last_watch_usecs.erase(sid);361}362}363return OK;364} else {365ERR_FAIL_COND_V(!peers_info.has(p_peer), ERR_INVALID_PARAMETER);366if (is_visible == peers_info[p_peer].sync_nodes.has(sid)) {367return OK;368}369if (is_visible) {370peers_info[p_peer].sync_nodes.insert(sid);371} else {372peers_info[p_peer].sync_nodes.erase(sid);373peers_info[p_peer].last_watch_usecs.erase(sid);374}375return OK;376}377}378379Error SceneReplicationInterface::_update_spawn_visibility(int p_peer, const ObjectID &p_oid) {380const TrackedNode *tnode = tracked_nodes.getptr(p_oid);381ERR_FAIL_NULL_V(tnode, ERR_BUG);382MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tnode->spawner);383Node *node = get_id_as<Node>(p_oid);384ERR_FAIL_NULL_V(node, ERR_BUG);385ERR_FAIL_NULL_V(spawner, ERR_BUG);386ERR_FAIL_COND_V(!_has_authority(spawner), ERR_BUG);387ERR_FAIL_COND_V(!tracked_nodes.has(p_oid), ERR_BUG);388const HashSet<ObjectID> synchronizers = tracked_nodes[p_oid].synchronizers;389bool is_visible = true;390for (const ObjectID &sid : synchronizers) {391MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid);392ERR_CONTINUE(!sync);393if (!_has_authority(sync)) {394continue;395}396// Spawn visibility is composed using OR when multiple synchronizers are present.397if (sync->is_visible_to(p_peer)) {398is_visible = true;399break;400}401is_visible = false;402}403// Spawn (and despawn) when needed.404HashSet<int> to_spawn;405HashSet<int> to_despawn;406if (p_peer) {407ERR_FAIL_COND_V(!peers_info.has(p_peer), ERR_INVALID_PARAMETER);408if (is_visible == peers_info[p_peer].spawn_nodes.has(p_oid)) {409return OK;410}411if (is_visible) {412to_spawn.insert(p_peer);413} else {414to_despawn.insert(p_peer);415}416} else {417// Check visibility for each peers.418for (const KeyValue<int, PeerInfo> &E : peers_info) {419if (is_visible) {420// This is fast, since the object is visible to everyone, we don't need to check each peer.421if (E.value.spawn_nodes.has(p_oid)) {422// Already spawned.423continue;424}425to_spawn.insert(E.key);426} else {427// Need to check visibility for each peer.428_update_spawn_visibility(E.key, p_oid);429}430}431}432if (to_spawn.size()) {433int len = 0;434_make_spawn_packet(node, spawner, len);435for (int pid : to_spawn) {436ERR_CONTINUE(!peers_info.has(pid));437int path_id;438multiplayer_cache->send_object_cache(spawner, pid, path_id);439_send_raw(packet_cache.ptr(), len, pid, true);440peers_info[pid].spawn_nodes.insert(p_oid);441}442}443if (to_despawn.size()) {444int len = 0;445_make_despawn_packet(node, len);446for (int pid : to_despawn) {447ERR_CONTINUE(!peers_info.has(pid));448peers_info[pid].spawn_nodes.erase(p_oid);449_send_raw(packet_cache.ptr(), len, pid, true);450}451}452return OK;453}454455Error SceneReplicationInterface::_send_raw(const uint8_t *p_buffer, int p_size, int p_peer, bool p_reliable) {456ERR_FAIL_COND_V(!p_buffer || p_size < 1, ERR_INVALID_PARAMETER);457458Ref<MultiplayerPeer> peer = multiplayer->get_multiplayer_peer();459ERR_FAIL_COND_V(peer.is_null(), ERR_UNCONFIGURED);460peer->set_transfer_channel(0);461peer->set_transfer_mode(p_reliable ? MultiplayerPeer::TRANSFER_MODE_RELIABLE : MultiplayerPeer::TRANSFER_MODE_UNRELIABLE);462return multiplayer->send_command(p_peer, p_buffer, p_size);463}464465Error SceneReplicationInterface::_make_spawn_packet(Node *p_node, MultiplayerSpawner *p_spawner, int &r_len) {466ERR_FAIL_COND_V(!multiplayer || !p_node || !p_spawner, ERR_BUG);467468const ObjectID oid = p_node->get_instance_id();469TrackedNode *tnode = tracked_nodes.getptr(oid);470ERR_FAIL_NULL_V(tnode, ERR_INVALID_PARAMETER);471472if (tnode->net_id == 0) {473// Ensure the node has an ID.474tnode->net_id = ++last_net_id;475}476uint32_t nid = tnode->net_id;477ERR_FAIL_COND_V(!nid, ERR_UNCONFIGURED);478479// Prepare custom arg and scene_id480uint8_t scene_id = p_spawner->find_spawnable_scene_index_from_object(oid);481bool is_custom = scene_id == MultiplayerSpawner::INVALID_ID;482Variant spawn_arg = p_spawner->get_spawn_argument(oid);483int spawn_arg_size = 0;484if (is_custom) {485Error err = MultiplayerAPI::encode_and_compress_variant(spawn_arg, nullptr, spawn_arg_size, false);486ERR_FAIL_COND_V(err, err);487}488489// Prepare spawn state.490List<NodePath> state_props;491List<uint32_t> sync_ids;492const HashSet<ObjectID> synchronizers = tnode->synchronizers;493for (const ObjectID &sid : synchronizers) {494MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid);495if (!_has_authority(sync)) {496continue;497}498ERR_CONTINUE(!sync);499ERR_FAIL_NULL_V(sync->get_replication_config_ptr(), ERR_BUG);500for (const NodePath &prop : sync->get_replication_config_ptr()->get_spawn_properties()) {501state_props.push_back(prop);502}503// Ensure the synchronizer has an ID.504if (sync->get_net_id() == 0) {505sync->set_net_id(++last_net_id);506}507sync_ids.push_back(sync->get_net_id());508}509int state_size = 0;510Vector<Variant> state_vars;511Vector<const Variant *> state_varp;512if (state_props.size()) {513Error err = MultiplayerSynchronizer::get_state(state_props, p_node, state_vars, state_varp);514ERR_FAIL_COND_V_MSG(err != OK, err, "Unable to retrieve spawn state.");515err = MultiplayerAPI::encode_and_compress_variants(state_varp.ptrw(), state_varp.size(), nullptr, state_size);516ERR_FAIL_COND_V_MSG(err != OK, err, "Unable to encode spawn state.");517}518519// Encode scene ID, path ID, net ID, node name.520int path_id = multiplayer_cache->make_object_cache(p_spawner);521CharString cname = p_node->get_name().operator String().utf8();522int nlen = encode_cstring(cname.get_data(), nullptr);523MAKE_ROOM(1 + 1 + 4 + 4 + 4 + 4 * sync_ids.size() + 4 + nlen + (is_custom ? 4 + spawn_arg_size : 0) + state_size);524uint8_t *ptr = packet_cache.ptrw();525ptr[0] = (uint8_t)SceneMultiplayer::NETWORK_COMMAND_SPAWN;526ptr[1] = scene_id;527int ofs = 2;528ofs += encode_uint32(path_id, &ptr[ofs]);529ofs += encode_uint32(nid, &ptr[ofs]);530ofs += encode_uint32(sync_ids.size(), &ptr[ofs]);531ofs += encode_uint32(nlen, &ptr[ofs]);532for (uint32_t snid : sync_ids) {533ofs += encode_uint32(snid, &ptr[ofs]);534}535ofs += encode_cstring(cname.get_data(), &ptr[ofs]);536// Write args537if (is_custom) {538ofs += encode_uint32(spawn_arg_size, &ptr[ofs]);539Error err = MultiplayerAPI::encode_and_compress_variant(spawn_arg, &ptr[ofs], spawn_arg_size, false);540ERR_FAIL_COND_V(err, err);541ofs += spawn_arg_size;542}543// Write state.544if (state_size) {545Error err = MultiplayerAPI::encode_and_compress_variants(state_varp.ptrw(), state_varp.size(), &ptr[ofs], state_size);546ERR_FAIL_COND_V(err, err);547ofs += state_size;548}549r_len = ofs;550return OK;551}552553Error SceneReplicationInterface::_make_despawn_packet(Node *p_node, int &r_len) {554const ObjectID oid = p_node->get_instance_id();555const TrackedNode *tnode = tracked_nodes.getptr(oid);556ERR_FAIL_NULL_V(tnode, ERR_INVALID_PARAMETER);557MAKE_ROOM(5);558uint8_t *ptr = packet_cache.ptrw();559ptr[0] = (uint8_t)SceneMultiplayer::NETWORK_COMMAND_DESPAWN;560int ofs = 1;561uint32_t nid = tnode->net_id;562ofs += encode_uint32(nid, &ptr[ofs]);563r_len = ofs;564return OK;565}566567Error SceneReplicationInterface::on_spawn_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {568ERR_FAIL_COND_V_MSG(p_buffer_len < 18, ERR_INVALID_DATA, "Invalid spawn packet received");569int ofs = 1; // The spawn/despawn command.570uint8_t scene_id = p_buffer[ofs];571ofs += 1;572uint32_t node_target = decode_uint32(&p_buffer[ofs]);573ofs += 4;574MultiplayerSpawner *spawner = Object::cast_to<MultiplayerSpawner>(multiplayer_cache->get_cached_object(p_from, node_target));575ERR_FAIL_NULL_V(spawner, ERR_DOES_NOT_EXIST);576ERR_FAIL_COND_V(p_from != spawner->get_multiplayer_authority(), ERR_UNAUTHORIZED);577578uint32_t net_id = decode_uint32(&p_buffer[ofs]);579ofs += 4;580uint32_t sync_len = decode_uint32(&p_buffer[ofs]);581ofs += 4;582uint32_t name_len = decode_uint32(&p_buffer[ofs]);583ofs += 4;584ERR_FAIL_COND_V_MSG(name_len + (sync_len * 4) > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA, vformat("Invalid spawn packet size: %d, wants: %d", p_buffer_len, ofs + name_len + (sync_len * 4)));585List<uint32_t> sync_ids;586for (uint32_t i = 0; i < sync_len; i++) {587sync_ids.push_back(decode_uint32(&p_buffer[ofs]));588ofs += 4;589}590ERR_FAIL_COND_V_MSG(name_len < 1, ERR_INVALID_DATA, "Zero spawn name size.");591592// We need to make sure no trickery happens here, but we want to allow autogenerated ("@") node names.593const String name = String::utf8((const char *)&p_buffer[ofs], name_len);594ERR_FAIL_COND_V_MSG(name.validate_node_name() != name, ERR_INVALID_DATA, vformat("Invalid node name received: '%s'. Make sure to add nodes via 'add_child(node, true)' remotely.", name));595ofs += name_len;596597// Check that we can spawn.598Node *parent = spawner->get_node_or_null(spawner->get_spawn_path());599ERR_FAIL_NULL_V(parent, ERR_UNCONFIGURED);600ERR_FAIL_COND_V(parent->has_node(name), ERR_INVALID_DATA);601602Node *node = nullptr;603if (scene_id == MultiplayerSpawner::INVALID_ID) {604// Custom spawn.605ERR_FAIL_COND_V(p_buffer_len - ofs < 4, ERR_INVALID_DATA);606uint32_t arg_size = decode_uint32(&p_buffer[ofs]);607ofs += 4;608ERR_FAIL_COND_V(arg_size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);609Variant v;610Error err = MultiplayerAPI::decode_and_decompress_variant(v, &p_buffer[ofs], arg_size, nullptr, false);611ERR_FAIL_COND_V(err != OK, err);612ofs += arg_size;613node = spawner->instantiate_custom(v);614} else {615// Scene based spawn.616node = spawner->instantiate_scene(scene_id);617}618ERR_FAIL_NULL_V(node, ERR_UNAUTHORIZED);619node->set_name(name);620621// Add and track remote622ERR_FAIL_COND_V(!peers_info.has(p_from), ERR_UNAVAILABLE);623ERR_FAIL_COND_V(peers_info[p_from].recv_nodes.has(net_id), ERR_ALREADY_IN_USE);624ObjectID oid = node->get_instance_id();625TrackedNode &tobj = _track(oid);626tobj.spawner = spawner->get_instance_id();627tobj.net_id = net_id;628tobj.remote_peer = p_from;629peers_info[p_from].recv_nodes[net_id] = oid;630631// The initial state will be applied during the sync config (i.e. before _ready).632pending_spawn = node->get_instance_id();633pending_spawn_remote = p_from;634pending_buffer_size = p_buffer_len - ofs;635pending_buffer = pending_buffer_size > 0 ? &p_buffer[ofs] : nullptr;636pending_sync_net_ids = sync_ids;637638parent->add_child(node);639spawner->emit_signal(SNAME("spawned"), node);640641pending_spawn = ObjectID();642pending_spawn_remote = 0;643pending_buffer = nullptr;644pending_buffer_size = 0;645if (pending_sync_net_ids.size()) {646pending_sync_net_ids.clear();647ERR_FAIL_V(ERR_INVALID_DATA); // Should have been consumed.648}649return OK;650}651652Error SceneReplicationInterface::on_despawn_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {653ERR_FAIL_COND_V_MSG(p_buffer_len < 5, ERR_INVALID_DATA, "Invalid spawn packet received");654int ofs = 1; // The spawn/despawn command.655uint32_t net_id = decode_uint32(&p_buffer[ofs]);656ofs += 4;657658// Untrack remote659ERR_FAIL_COND_V(!peers_info.has(p_from), ERR_UNAUTHORIZED);660PeerInfo &pinfo = peers_info[p_from];661ERR_FAIL_COND_V(!pinfo.recv_nodes.has(net_id), ERR_UNAUTHORIZED);662Node *node = get_id_as<Node>(pinfo.recv_nodes[net_id]);663ERR_FAIL_NULL_V(node, ERR_BUG);664pinfo.recv_nodes.erase(net_id);665666const ObjectID oid = node->get_instance_id();667ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_BUG);668MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tracked_nodes[oid].spawner);669ERR_FAIL_NULL_V(spawner, ERR_DOES_NOT_EXIST);670ERR_FAIL_COND_V(p_from != spawner->get_multiplayer_authority(), ERR_UNAUTHORIZED);671672if (node->get_parent() != nullptr) {673node->get_parent()->remove_child(node);674}675node->queue_free();676spawner->emit_signal(SNAME("despawned"), node);677678return OK;679}680681bool SceneReplicationInterface::_verify_synchronizer(int p_peer, MultiplayerSynchronizer *p_sync, uint32_t &r_net_id) {682r_net_id = p_sync->get_net_id();683if (r_net_id == 0 || (r_net_id & 0x80000000)) {684int path_id = 0;685bool verified = multiplayer_cache->send_object_cache(p_sync, p_peer, path_id);686ERR_FAIL_COND_V_MSG(path_id < 0, false, "This should never happen!");687if (r_net_id == 0) {688// First time path based ID.689r_net_id = path_id | 0x80000000;690p_sync->set_net_id(r_net_id | 0x80000000);691}692return verified;693}694return true;695}696697MultiplayerSynchronizer *SceneReplicationInterface::_find_synchronizer(int p_peer, uint32_t p_net_id) {698MultiplayerSynchronizer *sync = nullptr;699if (p_net_id & 0x80000000) {700sync = Object::cast_to<MultiplayerSynchronizer>(multiplayer_cache->get_cached_object(p_peer, p_net_id & 0x7FFFFFFF));701} else if (peers_info[p_peer].recv_sync_ids.has(p_net_id)) {702const ObjectID &sid = peers_info[p_peer].recv_sync_ids[p_net_id];703sync = get_id_as<MultiplayerSynchronizer>(sid);704}705return sync;706}707708void SceneReplicationInterface::_send_delta(int p_peer, const HashSet<ObjectID> &p_synchronizers, uint64_t p_usec, const HashMap<ObjectID, uint64_t> &p_last_watch_usecs) {709MAKE_ROOM(/* header */ 1 + /* element */ 4 + 8 + 4 + delta_mtu);710uint8_t *ptr = packet_cache.ptrw();711ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC | (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT);712int ofs = 1;713for (const ObjectID &oid : p_synchronizers) {714MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);715ERR_CONTINUE(!sync || !sync->get_replication_config_ptr() || !_has_authority(sync));716uint32_t net_id;717if (!_verify_synchronizer(p_peer, sync, net_id)) {718continue;719}720uint64_t last_usec = p_last_watch_usecs.has(oid) ? p_last_watch_usecs[oid] : 0;721uint64_t indexes;722List<Variant> delta = sync->get_delta_state(p_usec, last_usec, indexes);723724if (!delta.size()) {725continue; // Nothing to update.726}727728Vector<const Variant *> varp;729varp.resize(delta.size());730const Variant **vptr = varp.ptrw();731int i = 0;732for (const Variant &v : delta) {733vptr[i] = &v;734i++;735}736int size;737Error err = MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), nullptr, size);738ERR_CONTINUE_MSG(err != OK, "Unable to encode delta state.");739740ERR_CONTINUE_MSG(size > delta_mtu, vformat("Synchronizer delta bigger than MTU will not be sent (%d > %d): %s", size, delta_mtu, sync->get_path()));741742if (ofs + 4 + 8 + 4 + size > delta_mtu) {743// Send what we got, and reset write.744_send_raw(packet_cache.ptr(), ofs, p_peer, true);745ofs = 1;746}747if (size) {748ofs += encode_uint32(sync->get_net_id(), &ptr[ofs]);749ofs += encode_uint64(indexes, &ptr[ofs]);750ofs += encode_uint32(size, &ptr[ofs]);751MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), &ptr[ofs], size);752ofs += size;753}754#ifdef DEBUG_ENABLED755_profile_node_data("delta_out", oid, size);756#endif757peers_info[p_peer].last_watch_usecs[oid] = p_usec;758}759if (ofs > 1) {760// Got some left over to send.761_send_raw(packet_cache.ptr(), ofs, p_peer, true);762}763}764765Error SceneReplicationInterface::on_delta_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {766int ofs = 1;767while (ofs + 4 + 8 + 4 < p_buffer_len) {768uint32_t net_id = decode_uint32(&p_buffer[ofs]);769ofs += 4;770uint64_t indexes = decode_uint64(&p_buffer[ofs]);771ofs += 8;772uint32_t size = decode_uint32(&p_buffer[ofs]);773ofs += 4;774ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);775MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id);776Node *node = sync ? sync->get_root_node() : nullptr;777if (!sync || sync->get_multiplayer_authority() != p_from || !node) {778ofs += size;779ERR_CONTINUE_MSG(true, "Ignoring delta for non-authority or invalid synchronizer.");780}781List<NodePath> props = sync->get_delta_properties(indexes);782ERR_FAIL_COND_V(props.is_empty(), ERR_INVALID_DATA);783Vector<Variant> vars;784vars.resize(props.size());785int consumed = 0;786Error err = MultiplayerAPI::decode_and_decompress_variants(vars, p_buffer + ofs, size, consumed);787ERR_FAIL_COND_V(err != OK, err);788ERR_FAIL_COND_V(uint32_t(consumed) != size, ERR_INVALID_DATA);789err = MultiplayerSynchronizer::set_state(props, node, vars);790ERR_FAIL_COND_V(err != OK, err);791ofs += size;792sync->emit_signal(SNAME("delta_synchronized"));793#ifdef DEBUG_ENABLED794_profile_node_data("delta_in", sync->get_instance_id(), size);795#endif796}797return OK;798}799800void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> &p_synchronizers, uint16_t p_sync_net_time, uint64_t p_usec) {801MAKE_ROOM(/* header */ 3 + /* element */ 4 + 4 + sync_mtu);802uint8_t *ptr = packet_cache.ptrw();803ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC;804int ofs = 1;805ofs += encode_uint16(p_sync_net_time, &ptr[1]);806// Can only send updates for already notified nodes.807// This is a lazy implementation, we could optimize much more here with by grouping by replication config.808for (const ObjectID &oid : p_synchronizers) {809MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);810ERR_CONTINUE(!sync || !sync->get_replication_config_ptr() || !_has_authority(sync));811if (!sync->update_outbound_sync_time(p_usec)) {812continue; // nothing to sync.813}814815Node *node = sync->get_root_node();816ERR_CONTINUE(!node);817uint32_t net_id = sync->get_net_id();818if (!_verify_synchronizer(p_peer, sync, net_id)) {819// The path based sync is not yet confirmed, skipping.820continue;821}822int size;823Vector<Variant> vars;824Vector<const Variant *> varp;825const List<NodePath> props = sync->get_replication_config_ptr()->get_sync_properties();826Error err = MultiplayerSynchronizer::get_state(props, node, vars, varp);827ERR_CONTINUE_MSG(err != OK, "Unable to retrieve sync state.");828err = MultiplayerAPI::encode_and_compress_variants(varp.ptrw(), varp.size(), nullptr, size);829ERR_CONTINUE_MSG(err != OK, "Unable to encode sync state.");830// TODO Handle single state above MTU.831ERR_CONTINUE_MSG(size > sync_mtu, vformat("Node states bigger than MTU will not be sent (%d > %d): %s", size, sync_mtu, node->get_path()));832if (ofs + 4 + 4 + size > sync_mtu) {833// Send what we got, and reset write.834_send_raw(packet_cache.ptr(), ofs, p_peer, false);835ofs = 3;836}837if (size) {838ofs += encode_uint32(sync->get_net_id(), &ptr[ofs]);839ofs += encode_uint32(size, &ptr[ofs]);840MultiplayerAPI::encode_and_compress_variants(varp.ptrw(), varp.size(), &ptr[ofs], size);841ofs += size;842}843#ifdef DEBUG_ENABLED844_profile_node_data("sync_out", oid, size);845#endif846}847if (ofs > 3) {848// Got some left over to send.849_send_raw(packet_cache.ptr(), ofs, p_peer, false);850}851}852853Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {854ERR_FAIL_COND_V_MSG(p_buffer_len < 11, ERR_INVALID_DATA, "Invalid sync packet received");855bool is_delta = (p_buffer[0] & (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT)) != 0;856if (is_delta) {857return on_delta_receive(p_from, p_buffer, p_buffer_len);858}859uint16_t time = decode_uint16(&p_buffer[1]);860int ofs = 3;861while (ofs + 8 < p_buffer_len) {862uint32_t net_id = decode_uint32(&p_buffer[ofs]);863ofs += 4;864uint32_t size = decode_uint32(&p_buffer[ofs]);865ofs += 4;866ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);867MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id);868if (!sync) {869// Not received yet.870ofs += size;871continue;872}873Node *node = sync->get_root_node();874if (sync->get_multiplayer_authority() != p_from || !node) {875// Not valid for me.876ofs += size;877ERR_CONTINUE_MSG(true, "Ignoring sync data from non-authority or for missing node.");878}879if (!sync->update_inbound_sync_time(time)) {880// State is too old.881ofs += size;882continue;883}884const List<NodePath> props = sync->get_replication_config_ptr()->get_sync_properties();885Vector<Variant> vars;886vars.resize(props.size());887int consumed;888Error err = MultiplayerAPI::decode_and_decompress_variants(vars, &p_buffer[ofs], size, consumed);889ERR_FAIL_COND_V(err, err);890err = MultiplayerSynchronizer::set_state(props, node, vars);891ERR_FAIL_COND_V(err, err);892ofs += size;893sync->emit_signal(SNAME("synchronized"));894#ifdef DEBUG_ENABLED895_profile_node_data("sync_in", sync->get_instance_id(), size);896#endif897}898return OK;899}900901void SceneReplicationInterface::set_max_sync_packet_size(int p_size) {902ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes.");903sync_mtu = p_size;904}905906int SceneReplicationInterface::get_max_sync_packet_size() const {907return sync_mtu;908}909910void SceneReplicationInterface::set_max_delta_packet_size(int p_size) {911ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes.");912delta_mtu = p_size;913}914915int SceneReplicationInterface::get_max_delta_packet_size() const {916return delta_mtu;917}918919920