diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b383226..3d96aeb6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,19 +7,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -### Added - -- `TicksMap` with mapping from server ticks to system change ticks. - ### Changed +- Send all component mappings, inserts, removals and despawns over reliable channel in form of deltas and component updates over unreliable channel packed by packet size. This significantly reduces the possibility of packet loss. +- Replace `REPLICATION_CHANNEL_ID` with `ReplicationChannel` enum. The previous constant corresponded to the unreliable channel. +- Server events use tick with the last change instead of waiting for replication message without changes. +- `TickPolicy::EveryFrame` and `TickPolicy::MaxTickRate` now increment tick only if `RenetServer` exists. +- `ServerSet::Send` now always runs. Replication sending system still runs on `RepliconTick` change. +- `ClientMapping` no longer contains `tick` field. - Use `EntityHashMap` instead of `HashMap` with entities as keys. - Use `Cursor<&[u8]>` instead of `Cursor`. - Replace `LastRepliconTick` with `RepliconTick` on client. -- `AckedTicks` now returns the map via `deref` instead of via separate method. - Fix missing reset of `RepliconTick` on server disconnect. - Rename `replicate_into_scene` into `replicate_into` and move it to `scene` module. +### Removed + +- `AckedTicks` resource. +- `TicksMap` resource. + ## [0.17.0] - 2023-11-13 ### Added diff --git a/Cargo.toml b/Cargo.toml index 89a405cf..79cf473d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,9 @@ include = ["/benches", "/src", "/tests", "/LICENSE*"] [dependencies] bevy_renet = "0.0.10" -bevy = { version = "0.12", default-features = false, features = ["bevy_scene"] } +bevy = { version = "0.12.1", default-features = false, features = [ + "bevy_scene", +] } bincode = "1.3" serde = "1.0" varint-rs = "2.2" diff --git a/deny.toml b/deny.toml index 0b9aae52..1abcc01f 100644 --- a/deny.toml +++ b/deny.toml @@ -11,10 +11,14 @@ allow-osi-fsf-free = "either" multiple-versions = "deny" wildcards = "allow" skip = [ + { name = "async-channel", version = "1.9" }, + { name = "async-lock", version = "2.8" }, { name = "bitflags", version = "2.0" }, + { name = "event-listener", version = "2.5" }, { name = "fastrand", version = "1.9" }, { name = "foreign-types", version = "0.3" }, { name = "foreign-types-shared", version = "0.1" }, + { name = "futures-lite", version = "1.13" }, { name = "hashbrown", version = "0.12" }, { name = "indexmap", version = "1.0" }, { name = "libloading", version = "0.7" }, @@ -26,6 +30,7 @@ skip = [ { name = "regex-syntax", version = "0.7" }, { name = "syn", version = "1.0" }, { name = "toml_edit", version = "0.19" }, + { name = "tracing-log", version = "0.1" }, { name = "windows", version = "0.44" }, { name = "windows-sys", version = "0.45" }, { name = "windows-targets", version = "0.42" }, diff --git a/src/client.rs b/src/client.rs index d7bcc063..86be8cb6 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,7 +6,7 @@ use bevy::{ prelude::*, utils::{hashbrown::hash_map::Entry, EntityHashMap}, }; -use bevy_renet::client_connected; +use bevy_renet::{client_connected, renet::Bytes}; use bevy_renet::{renet::RenetClient, transport::NetcodeClientPlugin, RenetClientPlugin}; use bincode::{DefaultOptions, Options}; use varint_rs::VarintReader; @@ -14,7 +14,7 @@ use varint_rs::VarintReader; use crate::replicon_core::{ replication_rules::{Mapper, Replication, ReplicationRules}, replicon_tick::RepliconTick, - REPLICATION_CHANNEL_ID, + ReplicationChannel, }; use diagnostics::ClientStats; @@ -24,6 +24,8 @@ impl Plugin for ClientPlugin { fn build(&self, app: &mut App) { app.add_plugins((RenetClientPlugin, NetcodeClientPlugin)) .init_resource::() + .init_resource::() + .init_resource::() .configure_sets( PreUpdate, ClientSet::Receive.after(NetcodeClientPlugin::update_system), @@ -41,61 +43,141 @@ impl Plugin for ClientPlugin { ) .add_systems( PostUpdate, - ( - Self::ack_sending_system - .in_set(ClientSet::Send) - .run_if(client_connected()), - Self::reset_system.run_if(resource_removed::()), - ), + Self::reset_system.run_if(resource_removed::()), ); } } impl ClientPlugin { + /// Receives and applies replication messages from the server. + /// + /// Tick init messages are sent over the [`ReplicationChannel::Reliable`] and are applied first to ensure valid state + /// for entity updates. + /// + /// Entity update messages are sent over [`ReplicationChannel::Unreliable`], which means they may appear + /// ahead-of or behind init messages from the same server tick. An update will only be applied if its + /// change tick has already appeared in an init message, otherwise it will be buffered while waiting. + /// Since entity updates can arrive in any order, updates will only be applied if they correspond to a more + /// recent server tick than the last acked server tick for each entity. + /// + /// Buffered entity update messages are processed last. + /// + /// Acknowledgments for received entity update messages are sent back to the server. + /// + /// See also [`ReplicationMessages`](crate::server::replication_messages::ReplicationMessages). pub(super) fn replication_receiving_system(world: &mut World) -> bincode::Result<()> { world.resource_scope(|world, mut client: Mut| { world.resource_scope(|world, mut entity_map: Mut| { - world.resource_scope(|world, replication_rules: Mut| { - let mut stats = world.remove_resource::(); - while let Some(message) = client.receive_message(REPLICATION_CHANNEL_ID) { - apply_message( - &message, - world, - &mut entity_map, - stats.as_mut(), - &replication_rules, - )?; - } - - if let Some(stats) = stats { - world.insert_resource(stats); - } - - Ok(()) + world.resource_scope(|world, mut entity_ticks: Mut| { + world.resource_scope(|world, mut buffered_updates: Mut| { + world.resource_scope(|world, replication_rules: Mut| { + let mut stats = world.remove_resource::(); + apply_replication( + world, + &mut client, + &mut entity_map, + &mut entity_ticks, + &mut buffered_updates, + stats.as_mut(), + &replication_rules, + )?; + + if let Some(stats) = stats { + world.insert_resource(stats); + } + + Ok(()) + }) + }) }) }) }) } - fn ack_sending_system(replicon_tick: Res, mut client: ResMut) { - let message = bincode::serialize(&*replicon_tick) - .unwrap_or_else(|e| panic!("client ack should be serialized: {e}")); - client.send_message(REPLICATION_CHANNEL_ID, message); - } - fn reset_system( mut replicon_tick: ResMut, mut entity_map: ResMut, + mut entity_ticks: ResMut, + mut buffered_updates: ResMut, ) { *replicon_tick = Default::default(); entity_map.clear(); + entity_ticks.clear(); + buffered_updates.clear(); } } -fn apply_message( +/// Reads all received messages and applies them. +/// +/// Sends acknowledgments for update messages back. +fn apply_replication( + world: &mut World, + client: &mut RenetClient, + entity_map: &mut ServerEntityMap, + entity_ticks: &mut ServerEntityTicks, + buffered_updates: &mut BufferedUpdates, + mut stats: Option<&mut ClientStats>, + replication_rules: &ReplicationRules, +) -> Result<(), Box> { + while let Some(message) = client.receive_message(ReplicationChannel::Reliable) { + apply_init_message( + &message, + world, + entity_map, + entity_ticks, + stats.as_deref_mut(), + replication_rules, + )?; + } + + let replicon_tick = *world.resource::(); + while let Some(message) = client.receive_message(ReplicationChannel::Unreliable) { + let index = apply_update_message( + message, + world, + entity_map, + entity_ticks, + buffered_updates, + stats.as_deref_mut(), + replication_rules, + replicon_tick, + )?; + + client.send_message(ReplicationChannel::Reliable, bincode::serialize(&index)?) + } + + let mut result = Ok(()); + buffered_updates.retain(|update| { + if update.last_change_tick > replicon_tick { + return true; + } + + trace!("applying buffered update message for {replicon_tick:?}"); + if let Err(e) = apply_update_components( + &mut Cursor::new(&*update.message), + world, + entity_map, + entity_ticks, + stats.as_deref_mut(), + replication_rules, + update.message_tick, + ) { + result = Err(e); + } + + false + }); + result?; + + Ok(()) +} + +/// Applies [`InitMessage`](crate::server::replication_messages::InitMessage). +fn apply_init_message( message: &[u8], world: &mut World, entity_map: &mut ServerEntityMap, + entity_ticks: &mut ServerEntityTicks, mut stats: Option<&mut ClientStats>, replication_rules: &ReplicationRules, ) -> bincode::Result<()> { @@ -106,39 +188,39 @@ fn apply_message( stats.bytes += end_pos; } - let Some(tick) = apply_tick(&mut cursor, world)? else { - return Ok(()); - }; - if cursor.position() == end_pos { - return Ok(()); - } + let replicon_tick = bincode::deserialize_from(&mut cursor)?; + trace!("applying init message for {replicon_tick:?}"); + *world.resource_mut::() = replicon_tick; + debug_assert!(cursor.position() < end_pos, "init message can't be empty"); apply_entity_mappings(&mut cursor, world, entity_map, stats.as_deref_mut())?; if cursor.position() == end_pos { return Ok(()); } - apply_components( + apply_init_components( &mut cursor, world, entity_map, + entity_ticks, stats.as_deref_mut(), - ComponentsKind::Change, + ComponentsKind::Insert, replication_rules, - tick, + replicon_tick, )?; if cursor.position() == end_pos { return Ok(()); } - apply_components( + apply_init_components( &mut cursor, world, entity_map, + entity_ticks, stats.as_deref_mut(), ComponentsKind::Removal, replication_rules, - tick, + replicon_tick, )?; if cursor.position() == end_pos { return Ok(()); @@ -148,32 +230,62 @@ fn apply_message( &mut cursor, world, entity_map, + entity_ticks, replication_rules, - tick, + replicon_tick, stats, )?; Ok(()) } -/// Deserializes server tick and applies it to [`LastTick`] if it is newer. +/// Applies [`UpdateMessage`](crate::server::replication_messages::UpdateMessage). /// -/// Returns the tick if [`LastTick`] has been updated. -fn apply_tick( - cursor: &mut Cursor<&[u8]>, +/// If the update message can't be applied yet (because the init message with the +/// corresponding tick hasn't arrived), it will be buffered. +/// +/// Returns update index to be used for acknowledgment. +#[allow(clippy::too_many_arguments)] +fn apply_update_message( + message: Bytes, world: &mut World, -) -> bincode::Result> { - let tick = bincode::deserialize_from(cursor)?; - - let mut replicon_tick = world.resource_mut::(); - if *replicon_tick < tick { - trace!("applying {tick:?} over {replicon_tick:?}"); - *replicon_tick = tick; - Ok(Some(tick)) - } else { - trace!("discarding {tick:?}, which is older then {replicon_tick:?}"); - Ok(None) + entity_map: &mut ServerEntityMap, + entity_ticks: &mut ServerEntityTicks, + buffered_updates: &mut BufferedUpdates, + mut stats: Option<&mut ClientStats>, + replication_rules: &ReplicationRules, + replicon_tick: RepliconTick, +) -> bincode::Result { + let end_pos: u64 = message.len().try_into().unwrap(); + let mut cursor = Cursor::new(&*message); + if let Some(stats) = &mut stats { + stats.packets += 1; + stats.bytes += end_pos; } + + let (last_change_tick, message_tick, update_index) = bincode::deserialize_from(&mut cursor)?; + if last_change_tick > replicon_tick { + trace!("buffering update message for {replicon_tick:?}"); + buffered_updates.push(BufferedUpdate { + last_change_tick, + message_tick, + message: message.slice(cursor.position() as usize..), + }); + return Ok(update_index); + } + + trace!("applying update message for {replicon_tick:?}"); + apply_update_components( + &mut cursor, + world, + entity_map, + entity_ticks, + stats, + replication_rules, + message_tick, + )?; + + Ok(update_index) } /// Applies received server mappings from client's pre-spawned entities. @@ -191,14 +303,6 @@ fn apply_entity_mappings( let server_entity = deserialize_entity(cursor)?; let client_entity = deserialize_entity(cursor)?; - if let Some(entry) = entity_map.to_client().get(&server_entity) { - // It's possible to receive the same mappings in multiple packets if the server has not - // yet received an ack from the client for the tick when the mapping was created. - if *entry != client_entity { - panic!("received mapping from {server_entity:?} to {client_entity:?}, but already mapped to {entry:?}"); - } - } - if let Some(mut entity) = world.get_entity_mut(client_entity) { debug!("received mapping from {server_entity:?} to {client_entity:?}"); entity.insert(Replication); @@ -212,34 +316,41 @@ fn apply_entity_mappings( } /// Deserializes replicated components of `components_kind` and applies them to the `world`. -fn apply_components( +#[allow(clippy::too_many_arguments)] +fn apply_init_components( cursor: &mut Cursor<&[u8]>, world: &mut World, entity_map: &mut ServerEntityMap, + entity_ticks: &mut ServerEntityTicks, mut stats: Option<&mut ClientStats>, components_kind: ComponentsKind, replication_rules: &ReplicationRules, - tick: RepliconTick, + replicon_tick: RepliconTick, ) -> bincode::Result<()> { let entities_count: u16 = bincode::deserialize_from(&mut *cursor)?; for _ in 0..entities_count { let entity = deserialize_entity(cursor)?; + let data_len: u16 = bincode::deserialize_from(&mut *cursor)?; let mut entity = entity_map.get_by_server_or_spawn(world, entity); - let components_count: u8 = bincode::deserialize_from(&mut *cursor)?; - if let Some(stats) = &mut stats { - stats.entities_changed += 1; - stats.components_changed += components_count as u32; - } - for _ in 0..components_count { + entity_ticks.insert(entity.id(), replicon_tick); + + let end_pos = cursor.position() + data_len as u64; + let mut components_count = 0u32; + while cursor.position() < end_pos { let replication_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; // SAFETY: server and client have identical `ReplicationRules` and server always sends valid IDs. let replication_info = unsafe { replication_rules.get_info_unchecked(replication_id) }; match components_kind { - ComponentsKind::Change => { - (replication_info.deserialize)(&mut entity, entity_map, cursor, tick)? + ComponentsKind::Insert => { + (replication_info.deserialize)(&mut entity, entity_map, cursor, replicon_tick)? } - ComponentsKind::Removal => (replication_info.remove)(&mut entity, tick), + ComponentsKind::Removal => (replication_info.remove)(&mut entity, replicon_tick), } + components_count += 1; + } + if let Some(stats) = &mut stats { + stats.entities_changed += 1; + stats.components_changed += components_count; } } @@ -251,8 +362,9 @@ fn apply_despawns( cursor: &mut Cursor<&[u8]>, world: &mut World, entity_map: &mut ServerEntityMap, + entity_ticks: &mut ServerEntityTicks, replication_rules: &ReplicationRules, - tick: RepliconTick, + replicon_tick: RepliconTick, stats: Option<&mut ClientStats>, ) -> bincode::Result<()> { let entities_count: u16 = bincode::deserialize_from(&mut *cursor)?; @@ -268,7 +380,58 @@ fn apply_despawns( .remove_by_server(server_entity) .and_then(|entity| world.get_entity_mut(entity)) { - (replication_rules.despawn_fn)(client_entity, tick); + entity_ticks.remove(&client_entity.id()); + (replication_rules.despawn_fn)(client_entity, replicon_tick); + } + } + + Ok(()) +} + +/// Deserializes replicated component updates and applies them to the `world`. +/// +/// Consumes all remaining bytes in the cursor. +fn apply_update_components( + cursor: &mut Cursor<&[u8]>, + world: &mut World, + entity_map: &mut ServerEntityMap, + entity_ticks: &mut ServerEntityTicks, + mut stats: Option<&mut ClientStats>, + replication_rules: &ReplicationRules, + message_tick: RepliconTick, +) -> bincode::Result<()> { + let len = cursor.get_ref().len() as u64; + while cursor.position() < len { + let entity = deserialize_entity(cursor)?; + let data_len: u16 = bincode::deserialize_from(&mut *cursor)?; + let Some(mut entity) = entity_map.get_by_server(world, entity) else { + // Update could arrive after a despawn from init message. + debug!("ignoring update received for unknown server's {entity:?}"); + cursor.set_position(cursor.position() + data_len as u64); + continue; + }; + let entity_tick = entity_ticks + .get_mut(&entity.id()) + .expect("all entities from update should have assigned ticks"); + if message_tick <= *entity_tick { + trace!("ignoring outdated update for client's {:?}", entity.id()); + cursor.set_position(cursor.position() + data_len as u64); + continue; + } + *entity_tick = message_tick; + + let end_pos = cursor.position() + data_len as u64; + let mut components_count = 0u32; + while cursor.position() < end_pos { + let replication_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; + // SAFETY: server and client have identical `ReplicationRules` and server always sends valid IDs. + let replication_info = unsafe { replication_rules.get_info_unchecked(replication_id) }; + (replication_info.deserialize)(&mut entity, entity_map, cursor, message_tick)?; + components_count += 1; + } + if let Some(stats) = &mut stats { + stats.entities_changed += 1; + stats.components_changed += components_count; } } @@ -277,7 +440,7 @@ fn apply_despawns( /// Deserializes `entity` from compressed index and generation. /// -/// For details see [`ReplicationBuffer::write_entity`](crate::server::replication_buffer::ReplicationBuffer::write_entity). +/// For details see [`ReplicationBuffer::write_entity`](crate::server::replication_message::replication_buffer::write_entity). fn deserialize_entity(cursor: &mut Cursor<&[u8]>) -> bincode::Result { let flagged_index: u64 = cursor.read_u64_varint()?; let has_generation = (flagged_index & 1) > 0; @@ -296,7 +459,7 @@ fn deserialize_entity(cursor: &mut Cursor<&[u8]>) -> bincode::Result { /// /// Parameter for [`apply_components`]. enum ComponentsKind { - Change, + Insert, Removal, } @@ -314,8 +477,6 @@ pub enum ClientSet { } /// Maps server entities to client entities and vice versa. -/// -/// Used only on client. #[derive(Default, Resource)] pub struct ServerEntityMap { server_to_client: EntityHashMap, @@ -323,9 +484,16 @@ pub struct ServerEntityMap { } impl ServerEntityMap { + /// Inserts a server-client pair into the map. + /// + /// # Panics + /// + /// Panics if this mapping is already present. #[inline] pub fn insert(&mut self, server_entity: Entity, client_entity: Entity) { - self.server_to_client.insert(server_entity, client_entity); + if let Some(existing_entity) = self.server_to_client.insert(server_entity, client_entity) { + panic!("mapping {server_entity:?} to {client_entity:?}, but it's already mapped to {existing_entity:?}"); + } self.client_to_server.insert(client_entity, server_entity); } @@ -346,6 +514,16 @@ impl ServerEntityMap { } } + pub(super) fn get_by_server<'a>( + &mut self, + world: &'a mut World, + server_entity: Entity, + ) -> Option> { + self.server_to_client + .get(&server_entity) + .map(|&entity| world.entity_mut(entity)) + } + pub(super) fn remove_by_server(&mut self, server_entity: Entity) -> Option { let client_entity = self.server_to_client.remove(&server_entity); if let Some(client_entity) = client_entity { @@ -399,3 +577,28 @@ impl Mapper for ClientMapper<'_> { }) } } + +/// Last received tick for each entity. +/// +/// Used to avoid applying old updates. +#[derive(Default, Deref, DerefMut, Resource)] +pub(super) struct ServerEntityTicks(EntityHashMap); + +#[derive(Default, Deref, DerefMut, Resource)] +pub(super) struct BufferedUpdates(Vec); + +/// Caches a partially-deserialized entity update message that is waiting for its tick to appear in an init message. +/// +/// See also [`crate::server::replication_messages::UpdateMessage`]. +pub(super) struct BufferedUpdate { + /// Required tick to wait for. + /// + /// See also [`crate::server::LastChangeTick`]. + last_change_tick: RepliconTick, + + /// The tick this update corresponds to. + message_tick: RepliconTick, + + /// Update data. + message: Bytes, +} diff --git a/src/lib.rs b/src/lib.rs index 6ab1ac39..4c328e31 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -117,7 +117,7 @@ fn deserialize_transform( entity: &mut EntityWorldMut, _entity_map: &mut ServerEntityMap, cursor: &mut Cursor<&[u8]>, - _tick: RepliconTick, + _replicon_tick: RepliconTick, ) -> bincode::Result<()> { let translation: Vec3 = bincode::deserialize_from(cursor)?; entity.insert(Transform::from_translation(translation)); @@ -375,12 +375,29 @@ They rarely used for gameplay systems (since you write the same logic for multiplayer and single-player!), but could be used for server creation / connection systems and corresponding UI. +## Eventual consistency + +All events, inserts, removals and despawns will be applied to clients in the same order as on the server. + +Entity component updates are grouped by entity, and component groupings may be applied to clients in a different order than on the server. +For example, if two entities are spawned in tick 1 on the server and their components are updated in tick 2, +then the client is guaranteed to see the spawns at the same time, but the component updates may appear in different client ticks. + +If a component is dependent on other data, updates to the component will only be applied to the client when that data has arrived. +So if your component references another entity, updates to that component will only be applied when the referenced entity has been spawned on the client. + +Updates for despawned entities will be discarded automatically, but events or components may reference despawned entities and should be handled with that in mind. + +Clients should never assume their world state is the same as the server's on any given tick value-wise. +World state on the client is only "eventually consistent" with the server's. + ## Limits To reduce packet size there are the following limits per replication update: -- Up to [`u16::MAX`] entities that have changed/added components with up to [`u8::MAX`] such components. -- Up to [`u16::MAX`] entities that have removed components with up to [`u8::MAX`] such components. +- Up to [`u16::MAX`] entities that have added components with up to [`u16::MAX`] bytes of component data. +- Up to [`u16::MAX`] entities that have changed components with up to [`u16::MAX`] bytes of component data. +- Up to [`u16::MAX`] entities that have removed components with up to [`u16::MAX`] bytes of component data. - Up to [`u16::MAX`] entities that were despawned. */ @@ -410,10 +427,10 @@ pub mod prelude { ReplicationRules, }, replicon_tick::RepliconTick, - NetworkChannels, RepliconCorePlugin, REPLICATION_CHANNEL_ID, + NetworkChannels, ReplicationChannel, RepliconCorePlugin, }, server::{ - has_authority, AckedTicks, ClientEntityMap, ClientMapping, ServerPlugin, ServerSet, + has_authority, ClientEntityMap, ClientMapping, LastChangeTick, ServerPlugin, ServerSet, TickPolicy, SERVER_ID, }, ReplicationPlugins, diff --git a/src/network_event/server_event.rs b/src/network_event/server_event.rs index ecff838e..3cdbfe37 100644 --- a/src/network_event/server_event.rs +++ b/src/network_event/server_event.rs @@ -15,7 +15,7 @@ use crate::{ replicon_core::{ replication_rules::MapNetworkEntities, replicon_tick::RepliconTick, NetworkChannels, }, - server::{has_authority, MinRepliconTick, ServerSet, SERVER_ID}, + server::{has_authority, LastChangeTick, ServerSet, SERVER_ID}, }; /// An extension trait for [`App`] for creating server events. @@ -66,13 +66,13 @@ pub trait ServerEventAppExt { fn sending_reflect_system( mut server: ResMut, mut reflect_events: EventReader>, - tick: Res, + last_change_tick: Res, channel: Res>, registry: Res, ) { let registry = registry.read(); for ToClients { event, mode } in reflect_events.read() { - let message = serialize_reflect_event(*tick, &event, ®istry) + let message = serialize_reflect_event(**last_change_tick, &event, ®istry) .expect("server event should be serializable"); server_event::send(&mut server, *channel, *mode, message) @@ -185,12 +185,11 @@ impl ServerEventAppExt for App { PostUpdate, ( ( - (min_tick_update_system::, sending_system) - .run_if(resource_exists::()), + sending_system.run_if(resource_exists::()), local_resending_system::.run_if(has_authority()), ) .chain() - .before(ServerPlugin::replication_sending_system) + .after(ServerPlugin::replication_sending_system) .in_set(ServerSet::Send), reset_system::.run_if(resource_removed::()), ), @@ -261,32 +260,18 @@ fn receiving_and_mapping_system( mut server: ResMut, mut server_events: EventReader>, - tick: Res, + last_change_tick: Res, channel: Res>, ) { for ToClients { event, mode } in server_events.read() { let message = DefaultOptions::new() - .serialize(&(*tick, event)) + .serialize(&(**last_change_tick, event)) .expect("server event should be serializable"); send(&mut server, *channel, *mode, message); } } -/// Updates [`MinRepliconTick`] to force server to send replication message even if there were no world changes. -/// -/// Needed because events on a client won't be emitted until the client acknowledges the event tick. -/// See also [`ServerEventQueue`]. -fn min_tick_update_system( - mut server_events: EventReader>, - mut min_tick: ResMut, - tick: Res, -) { - if server_events.read().count() > 0 { - **min_tick = *tick; - } -} - /// Transforms [`ToClients`] events into `T` events to "emulate" /// message sending for offline mode or when server is also a player fn local_resending_system( diff --git a/src/replicon_core.rs b/src/replicon_core.rs index 682c75e4..6ade076f 100644 --- a/src/replicon_core.rs +++ b/src/replicon_core.rs @@ -1,6 +1,8 @@ pub mod replication_rules; pub mod replicon_tick; +use std::time::Duration; + use bevy::prelude::*; use bevy_renet::renet::{ChannelConfig, SendType}; @@ -18,10 +20,22 @@ impl Plugin for RepliconCorePlugin { } } -/// ID of the server replication channel. +/// ID of a server replication channel. /// /// See also [`NetworkChannels`]. -pub const REPLICATION_CHANNEL_ID: u8 = 0; +#[repr(u8)] +pub enum ReplicationChannel { + /// For sending messages with entity mappings, inserts, removals and despawns. + Reliable, + /// For sending messages with component updates. + Unreliable, +} + +impl From for u8 { + fn from(value: ReplicationChannel) -> Self { + value as u8 + } +} /// A resource to configure and setup channels for [`ConnectionConfig`](bevy_renet::renet::ConnectionConfig) #[derive(Clone, Resource)] @@ -41,9 +55,19 @@ pub struct NetworkChannels { /// Stores only replication channel by default. impl Default for NetworkChannels { fn default() -> Self { + let replication_channels = vec![ + ( + SendType::ReliableOrdered { + resend_time: Duration::ZERO, + }, + None, + ), + (SendType::Unreliable, None), + ]; + Self { - server: vec![(SendType::Unreliable, None)], - client: vec![(SendType::Unreliable, None)], + server: replication_channels.clone(), + client: replication_channels, default_max_bytes: 5 * 1024 * 1024, // Value from `DefaultChannel::config()`. } } @@ -62,7 +86,7 @@ impl NetworkChannels { /// Sets maximum usage bytes for specific client channel. /// - /// [`REPLICATION_CHANNEL_ID`] or [`EventChannel`](crate::network_event::EventChannel) can be passed as `id`. + /// [`ReplicationChannel`] or [`EventChannel`](crate::network_event::EventChannel) can be passed as `id`. /// Without calling this function, the default value will be used. /// See also [`Self::set_default_max_bytes`]. pub fn set_server_max_bytes(&mut self, id: impl Into, max_bytes: usize) { diff --git a/src/replicon_core/replication_rules.rs b/src/replicon_core/replication_rules.rs index b2d2f71b..1aee85da 100644 --- a/src/replicon_core/replication_rules.rs +++ b/src/replicon_core/replication_rules.rs @@ -231,7 +231,7 @@ pub fn deserialize_component( entity: &mut EntityWorldMut, _entity_map: &mut ServerEntityMap, cursor: &mut Cursor<&[u8]>, - _tick: RepliconTick, + _replicon_tick: RepliconTick, ) -> bincode::Result<()> { let component: C = DefaultOptions::new().deserialize_from(cursor)?; entity.insert(component); @@ -244,7 +244,7 @@ pub fn deserialize_mapped_component, - _tick: RepliconTick, + _replicon_tick: RepliconTick, ) -> bincode::Result<()> { let mut component: C = DefaultOptions::new().deserialize_from(cursor)?; @@ -258,11 +258,11 @@ pub fn deserialize_mapped_component(entity: &mut EntityWorldMut, _tick: RepliconTick) { +pub fn remove_component(entity: &mut EntityWorldMut, _replicon_tick: RepliconTick) { entity.remove::(); } /// Default entity despawn function. -pub fn despawn_recursive(entity: EntityWorldMut, _tick: RepliconTick) { +pub fn despawn_recursive(entity: EntityWorldMut, _replicon_tick: RepliconTick) { entity.despawn_recursive(); } diff --git a/src/replicon_core/replicon_tick.rs b/src/replicon_core/replicon_tick.rs index fdbe419a..9d1e4392 100644 --- a/src/replicon_core/replicon_tick.rs +++ b/src/replicon_core/replicon_tick.rs @@ -6,7 +6,6 @@ use serde::{Deserialize, Serialize}; /// A tick that increments each time we need the server to compute and send an update. /// /// Updated on clients every time they receive replication from the server. -/// Mapped to the Bevy's `Tick` in [`AckedTicks`](crate::server::AckedTicks). /// See also [`TickPolicy`](crate::server::TickPolicy). #[derive(Clone, Copy, Debug, Default, Deserialize, Eq, Hash, PartialEq, Resource, Serialize)] pub struct RepliconTick(pub(crate) u32); diff --git a/src/server.rs b/src/server.rs index bacf0666..13d02eba 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,18 +1,20 @@ -pub(super) mod despawn_tracker; -pub(super) mod message; -pub(super) mod removal_tracker; +pub(super) mod clients_info; +pub(super) mod replication_buffer; +pub(super) mod replication_messages; -use std::time::Duration; +use std::{mem, time::Duration}; use bevy::{ ecs::{ archetype::ArchetypeId, - component::{StorageType, Tick}, + component::{ComponentTicks, StorageType, Tick}, + removal_detection::RemovedComponentEvents, system::SystemChangeTick, }, prelude::*, + ptr::Ptr, time::common_conditions::on_timer, - utils::HashMap, + utils::{EntityHashMap, HashMap}, }; use bevy_renet::{ renet::{ClientId, RenetClient, RenetServer, ServerEvent}, @@ -21,11 +23,12 @@ use bevy_renet::{ }; use crate::replicon_core::{ - replication_rules::ReplicationRules, replicon_tick::RepliconTick, REPLICATION_CHANNEL_ID, + replication_rules::{Replication, ReplicationId, ReplicationInfo, ReplicationRules}, + replicon_tick::RepliconTick, + ReplicationChannel, }; -use despawn_tracker::{DespawnTracker, DespawnTrackerPlugin}; -use message::ReplicationMessage; -use removal_tracker::{RemovalTracker, RemovalTrackerPlugin}; +use clients_info::{ClientInfo, ClientsInfo}; +use replication_messages::ReplicationMessages; pub const SERVER_ID: ClientId = ClientId::from_raw(0); @@ -43,39 +46,30 @@ impl Default for ServerPlugin { impl Plugin for ServerPlugin { fn build(&self, app: &mut App) { - app.add_plugins(( - RenetServerPlugin, - NetcodeServerPlugin, - RemovalTrackerPlugin, - DespawnTrackerPlugin, - )) - .init_resource::() - .init_resource::() - .init_resource::() - .init_resource::() - .configure_sets(PreUpdate, ServerSet::Receive.after(RenetReceive)) - .configure_sets( - PostUpdate, - ServerSet::Send - .before(RenetSend) - .run_if(resource_changed::()), - ) - .add_systems( - PreUpdate, - (Self::acks_receiving_system, Self::acks_cleanup_system) - .in_set(ServerSet::Receive) - .run_if(resource_exists::()), - ) - .add_systems( - PostUpdate, - ( - Self::replication_sending_system - .map(Result::unwrap) - .in_set(ServerSet::Send) + app.add_plugins((RenetServerPlugin, NetcodeServerPlugin)) + .init_resource::() + .init_resource::() + .init_resource::() + .configure_sets(PreUpdate, ServerSet::Receive.after(RenetReceive)) + .configure_sets(PostUpdate, ServerSet::Send.before(RenetSend)) + .add_systems( + PreUpdate, + (Self::handle_connections_system, Self::acks_receiving_system) + .chain() + .in_set(ServerSet::Receive) .run_if(resource_exists::()), - Self::reset_system.run_if(resource_removed::()), - ), - ); + ) + .add_systems( + PostUpdate, + ( + Self::replication_sending_system + .map(Result::unwrap) + .in_set(ServerSet::Send) + .run_if(resource_exists::()) + .run_if(resource_changed::()), + Self::reset_system.run_if(resource_removed::()), + ), + ); match self.tick_policy { TickPolicy::MaxTickRate(max_tick_rate) => { @@ -84,13 +78,16 @@ impl Plugin for ServerPlugin { PostUpdate, Self::increment_tick .before(Self::replication_sending_system) + .run_if(resource_exists::()) .run_if(on_timer(tick_time)), ); } TickPolicy::EveryFrame => { app.add_systems( PostUpdate, - Self::increment_tick.before(Self::replication_sending_system), + Self::increment_tick + .before(Self::replication_sending_system) + .run_if(resource_exists::()), ); } TickPolicy::Manual => (), @@ -104,149 +101,157 @@ impl ServerPlugin { } /// Increments current server tick which causes the server to replicate this frame. - pub fn increment_tick(mut tick: ResMut) { - tick.increment(); - trace!("incremented {tick:?}"); + pub fn increment_tick(mut replicon_tick: ResMut) { + replicon_tick.increment(); + trace!("incremented {replicon_tick:?}"); } - fn acks_receiving_system( - mut acked_ticks: ResMut, - mut ticks_map: ResMut, - mut server: ResMut, + fn handle_connections_system( + mut server_events: EventReader, mut entity_map: ResMut, + mut clients_info: ResMut, ) { - for client_id in server.clients_id() { - while let Some(message) = server.receive_message(client_id, REPLICATION_CHANNEL_ID) { - match bincode::deserialize::(&message) { - Ok(tick) => { - let acked_tick = acked_ticks.0.entry(client_id).or_default(); - if *acked_tick < tick { - *acked_tick = tick; - entity_map.cleanup_acked(client_id, *acked_tick); - trace!("client {client_id} acknowledged {tick:?}"); - } - } - Err(e) => error!("unable to deserialize tick from client {client_id}: {e}"), + for event in server_events.read() { + match *event { + ServerEvent::ClientDisconnected { client_id, .. } => { + entity_map.0.remove(&client_id); + clients_info.remove(client_id); + } + ServerEvent::ClientConnected { client_id } => { + clients_info.init(client_id); } } } - - ticks_map.cleanup_acked(&acked_ticks) } - fn acks_cleanup_system( - mut server_events: EventReader, - mut acked_ticks: ResMut, + fn acks_receiving_system( + change_tick: SystemChangeTick, + mut server: ResMut, + mut clients_info: ResMut, ) { - for event in server_events.read() { - match event { - ServerEvent::ClientDisconnected { client_id, .. } => { - acked_ticks.0.remove(client_id); - } - ServerEvent::ClientConnected { client_id } => { - acked_ticks.0.entry(*client_id).or_default(); + let ClientsInfo { + info, + entity_buffer, + .. + } = &mut *clients_info; + + for client_info in info.iter_mut() { + while let Some(message) = + server.receive_message(client_info.id, ReplicationChannel::Reliable) + { + match bincode::deserialize::(&message) { + Ok(update_index) => { + let Some((tick, mut entities)) = + client_info.update_entities.remove(&update_index) + else { + error!( + "received unknown update index {update_index} from client {}", + client_info.id + ); + continue; + }; + + for entity in &entities { + let last_tick = client_info + .ticks + .get_mut(entity) + .expect("tick should be inserted on any component insertion"); + + // Received tick could be outdated because we bump it + // if we detect any insertion on the entity in `collect_changes`. + if !last_tick.is_newer_than(tick, change_tick.this_run()) { + *last_tick = tick; + } + } + entities.clear(); + entity_buffer.push(entities); + + trace!( + "client {} acknowledged an update with {tick:?}", + client_info.id + ); + } + Err(e) => error!( + "unable to deserialize update index from client {}: {e}", + client_info.id + ), } } } } - #[allow(clippy::too_many_arguments)] + /// Collects [`ReplicationMessages`] and sends them. + #[allow(clippy::type_complexity)] pub(super) fn replication_sending_system( - mut messages: Local>, + mut messages: Local, change_tick: SystemChangeTick, - mut set: ParamSet<(&World, ResMut, ResMut)>, - acked_ticks: Res, + remove_events: &RemovedComponentEvents, + mut set: ParamSet<( + &World, + ResMut, + ResMut, + ResMut, + ResMut, + )>, + mut removed_replication: RemovedComponents, replication_rules: Res, - despawn_tracker: Res, replicon_tick: Res, - min_replicon_tick: Res, - removal_trackers: Query<(Entity, &RemovalTracker)>, - entity_map: Res, ) -> bincode::Result<()> { - set.p1().0.insert(*replicon_tick, change_tick.this_run()); + let info = mem::take(&mut set.p1().info); // Take ownership to avoid borrowing issues. + messages.prepare(info, *replicon_tick)?; - let messages = prepare_messages( + collect_mappings(&mut messages, &mut set.p2())?; + collect_changes(&mut messages, set.p0(), &change_tick, &replication_rules)?; + collect_removals( &mut messages, - &acked_ticks, - &set.p1(), - *replicon_tick, - *min_replicon_tick, - )?; - - collect_mappings(messages, &entity_map)?; - collect_changes( - messages, - set.p0(), + remove_events, change_tick.this_run(), &replication_rules, )?; - collect_removals(messages, &removal_trackers, change_tick.this_run())?; - collect_despawns(messages, &despawn_tracker, change_tick.this_run())?; + collect_despawns(&mut messages, &mut removed_replication)?; + + let last_change_tick = *set.p4(); + let mut entity_buffer = mem::take(&mut set.p1().entity_buffer); + let (last_change_tick, info) = messages.send( + &mut set.p3(), + &mut entity_buffer, + last_change_tick, + *replicon_tick, + change_tick.this_run(), + )?; - for messages in messages { - messages.send(&mut set.p2()); - } + // Return borrowed data back. + set.p1().info = info; + set.p1().entity_buffer = entity_buffer; + *set.p4() = last_change_tick; Ok(()) } fn reset_system( mut replicon_tick: ResMut, - mut acked_ticks: ResMut, - mut ticks_map: ResMut, + mut entity_map: ResMut, + mut clients_info: ResMut, ) { *replicon_tick = Default::default(); - acked_ticks.0.clear(); - ticks_map.0.clear(); + entity_map.0.clear(); + clients_info.clear(); } } -/// Initializes message for each client and returns it as mutable slice. +/// Collects and writes any new entity mappings that happened in this tick. /// -/// Reuses already allocated messages. -/// Creates new messages if number of clients is bigger then the number of allocated messages. -/// If there are more messages than the number of clients, then the extra messages remain untouched -/// and the returned slice will not include them. -fn prepare_messages<'a>( - messages: &'a mut Vec, - acked_ticks: &AckedTicks, - ticks_map: &TicksMap, - replicon_tick: RepliconTick, - min_replicon_tick: MinRepliconTick, -) -> bincode::Result<&'a mut [ReplicationMessage]> { - messages.reserve(acked_ticks.len()); - for (index, (&client_id, &acked_tick)) in acked_ticks.iter().enumerate() { - let system_tick = *ticks_map.get(&acked_tick).unwrap_or(&Tick::new(0)); - - let send_empty = acked_tick < *min_replicon_tick; - if let Some(message) = messages.get_mut(index) { - message.reset(replicon_tick, client_id, system_tick, send_empty)?; - } else { - messages.push(ReplicationMessage::new( - replicon_tick, - client_id, - system_tick, - send_empty, - )?); - } - } - - Ok(&mut messages[..acked_ticks.len()]) -} - -/// Collect and write any new entity mappings into messages since last acknowledged tick. -/// -/// Mappings will be processed first, so all referenced entities after it will behave correctly. +/// On deserialization mappings should be processed first, so all referenced entities after it will behave correctly. fn collect_mappings( - messages: &mut [ReplicationMessage], - entity_map: &ClientEntityMap, + messages: &mut ReplicationMessages, + entity_map: &mut ClientEntityMap, ) -> bincode::Result<()> { - for message in &mut *messages { + for (message, _, client_info) in messages.iter_mut_with_info() { message.start_array(); - if let Some(mappings) = entity_map.get(&message.client_id) { - for mapping in mappings { - message.write_client_mapping(mapping)?; + if let Some(mappings) = entity_map.0.get_mut(&client_info.id) { + for mapping in mappings.drain(..) { + message.write_client_mapping(&mapping)?; } } @@ -255,15 +260,16 @@ fn collect_mappings( Ok(()) } -/// Collect component changes into messages based on last acknowledged tick. +/// Collects component insertions from this tick into init messages, and changes into update messages +/// since the last entity tick. fn collect_changes( - messages: &mut [ReplicationMessage], + messages: &mut ReplicationMessages, world: &World, - system_tick: Tick, + change_tick: &SystemChangeTick, replication_rules: &ReplicationRules, ) -> bincode::Result<()> { - for message in &mut *messages { - message.start_array(); + for (init_message, _) in messages.iter_mut() { + init_message.start_array(); } for archetype in world @@ -280,8 +286,9 @@ fn collect_changes( .expect("archetype should be valid"); for archetype_entity in archetype.entities() { - for message in &mut *messages { - message.start_entity_data(archetype_entity.entity()); + for (init_message, update_message) in messages.iter_mut() { + init_message.start_entity_data(archetype_entity.entity()); + update_message.start_entity_data(archetype_entity.entity()) } for component_id in archetype.components() { @@ -310,15 +317,15 @@ fn collect_changes( let component = unsafe { column.get_data_unchecked(archetype_entity.table_row()) }; - for message in &mut *messages { - if ticks.is_changed(message.system_tick, system_tick) { - message.write_component( - replication_info, - replication_id, - component, - )?; - } - } + collect_component_change( + messages, + archetype_entity.entity(), + ticks, + change_tick, + replication_info, + replication_id, + component, + )?; } StorageType::SparseSet => { let sparse_set = world @@ -335,80 +342,138 @@ fn collect_changes( .get(entity) .unwrap_or_else(|| panic!("{entity:?} should have {component_id:?}")); - for message in &mut *messages { - if ticks.is_changed(message.system_tick, system_tick) { - message.write_component( - replication_info, - replication_id, - component, - )?; - } - } + collect_component_change( + messages, + entity, + ticks, + change_tick, + replication_info, + replication_id, + component, + )?; } } } - for message in &mut *messages { - message.end_entity_data()?; + for (init_message, update_message, client_info) in messages.iter_mut_with_info() { + if init_message.entity_data_len() != 0 { + // If there is any insertion, include all updates into init message + // and bump the last acknowledged tick to keep entity updates atomic. + init_message.take_entity_data(update_message); + client_info + .ticks + .insert(archetype_entity.entity(), change_tick.this_run()); + } else { + update_message.register_entity(); + update_message.end_entity_data()?; + } + + init_message.end_entity_data()?; } } } - for message in &mut *messages { - message.end_array()?; + for (init_message, _, client_info) in messages.iter_mut_with_info() { + client_info.just_connected = false; + init_message.end_array()?; } Ok(()) } -/// Collect component removals into messages based on last acknowledged tick. +/// Collects the component if it has been changed. +/// +/// If the component was added since the client's last init message, it will be collected into +/// init buffer. +fn collect_component_change( + messages: &mut ReplicationMessages, + entity: Entity, + ticks: ComponentTicks, + change_tick: &SystemChangeTick, + replication_info: &ReplicationInfo, + replication_id: ReplicationId, + component: Ptr, +) -> bincode::Result<()> { + for (init_message, update_message, client_info) in messages.iter_mut_with_info() { + if client_info.just_connected + || ticks.is_added(change_tick.last_run(), change_tick.this_run()) + { + init_message.write_component(replication_info, replication_id, component)?; + } else { + let tick = *client_info + .ticks + .get(&entity) + .expect("entity should be present after adding component"); + if ticks.is_changed(tick, change_tick.this_run()) { + update_message.write_component(replication_info, replication_id, component)?; + } + } + } + + Ok(()) +} + +/// Collects component removals from this tick into init messages. fn collect_removals( - messages: &mut [ReplicationMessage], - removal_trackers: &Query<(Entity, &RemovalTracker)>, - system_tick: Tick, + messages: &mut ReplicationMessages, + remove_events: &RemovedComponentEvents, + tick: Tick, + replication_rules: &ReplicationRules, ) -> bincode::Result<()> { - for message in &mut *messages { + for (message, _) in messages.iter_mut() { message.start_array(); } - for (entity, removal_tracker) in removal_trackers { - for message in &mut *messages { + // PERF: Unfortunately, removed components are grouped by type, not by entity. + // This is why we need an intermediate container. But in practice users rarely + // remove a lot of components in the same tick, so it's probably fine. + let mut removals: EntityHashMap<_, Vec<_>> = Default::default(); + for (&component_id, &replication_id) in replication_rules.get_ids() { + for entity in remove_events + .get(component_id) + .into_iter() + .flat_map(|removed| removed.iter_current_update_events().cloned()) + .map(Into::into) + { + removals.entry(entity).or_default().push(replication_id); + } + } + + for (entity, components) in removals { + for (message, _, client_info) in messages.iter_mut_with_info() { message.start_entity_data(entity); - for (&replication_id, &tick) in &removal_tracker.0 { - if tick.is_newer_than(message.system_tick, system_tick) { - message.write_replication_id(replication_id)?; - } + for &replication_id in &components { + client_info.ticks.insert(entity, tick); + message.write_replication_id(replication_id)?; } message.end_entity_data()?; } } - for message in &mut *messages { + for (message, _) in messages.iter_mut() { message.end_array()?; } Ok(()) } -/// Collect entity despawns into messages based on last acknowledged tick. +/// Collect entity despawns from this tick into init messages. fn collect_despawns( - messages: &mut [ReplicationMessage], - despawn_tracker: &DespawnTracker, - system_tick: Tick, + messages: &mut ReplicationMessages, + removed_replication: &mut RemovedComponents, ) -> bincode::Result<()> { - for message in &mut *messages { + for (message, _) in messages.iter_mut() { message.start_array(); } - for &(entity, tick) in &despawn_tracker.0 { - for message in &mut *messages { - if tick.is_newer_than(message.system_tick, system_tick) { - message.write_entity(entity)?; - } + for entity in removed_replication.read() { + for (message, _, client_info) in messages.iter_mut_with_info() { + client_info.ticks.remove(&entity); + message.write_entity(entity)?; } } - for message in &mut *messages { + for (message, _) in messages.iter_mut() { message.end_array()?; } @@ -444,36 +509,12 @@ pub enum TickPolicy { Manual, } -/// Stores mapping from server ticks to system change ticks. -/// -/// Used only on server. -#[derive(Default, Deref, Resource)] -pub struct TicksMap(HashMap); - -impl TicksMap { - fn cleanup_acked(&mut self, acked_ticks: &AckedTicks) { - self.0 - .retain(|tick, _| acked_ticks.values().any(|acked_tick| acked_tick <= tick)); - } -} - -/// Last acknowledged server ticks for all clients. -/// -/// Used only on server. -#[derive(Default, Deref, Resource)] -pub struct AckedTicks(HashMap); - -/// Contains the lowest replicon tick that should be acknowledged by clients. -/// -/// If a client has not acked this tick, then replication messages >= this tick -/// will be sent even if they do not contain data. +/// Contains the last tick in which a replicated entity was spawned, despawned, or gained/lost a component. /// -/// Used to synchronize server-sent events with clients. A client cannot consume -/// a server-sent event until it has acknowledged the tick where that event was -/// created. This means we need to replicate ticks after a server-sent event is -/// emitted to guarantee the client can eventually consume the event. -#[derive(Clone, Copy, Debug, Default, Deref, DerefMut, Resource)] -pub(super) struct MinRepliconTick(RepliconTick); +/// It should be included in update messages and server events instead of the current tick +/// to avoid needless waiting for the next init message to arrive. +#[derive(Clone, Copy, Debug, Default, Deref, Resource)] +pub struct LastChangeTick(RepliconTick); /** A resource that exists on the server for mapping server entities to @@ -519,7 +560,6 @@ fn confirm_bullet( mut commands: Commands, mut bullet_events: EventReader>, mut entity_map: ResMut, - tick: Res, ) { for FromClient { client_id, event } in bullet_events.read() { let server_entity = commands.spawn(Bullet).id(); // You can insert more components, they will be sent to the client's entity correctly. @@ -527,7 +567,6 @@ fn confirm_bullet( entity_map.insert( *client_id, ClientMapping { - tick: *tick, server_entity, client_entity: event.0, }, @@ -554,22 +593,11 @@ impl ClientEntityMap { pub fn insert(&mut self, client_id: ClientId, mapping: ClientMapping) { self.0.entry(client_id).or_default().push(mapping); } - - /// Removes acknowledged mappings. - fn cleanup_acked(&mut self, client_id: ClientId, acked_tick: RepliconTick) { - if let Some(mappings) = self.0.get_mut(&client_id) { - mappings.retain(|mapping| mapping.tick > acked_tick); - } - } } /// Stores the server entity corresponding to a client's pre-spawned entity. -/// -/// The `tick` is stored here so that this prediction data can be cleaned up once the tick -/// has been acked by the client. #[derive(Debug)] pub struct ClientMapping { - pub tick: RepliconTick, pub server_entity: Entity, pub client_entity: Entity, } diff --git a/src/server/clients_info.rs b/src/server/clients_info.rs new file mode 100644 index 00000000..63f0b3b9 --- /dev/null +++ b/src/server/clients_info.rs @@ -0,0 +1,109 @@ +use bevy::{ + ecs::component::Tick, + prelude::*, + utils::{EntityHashMap, HashMap}, +}; +use bevy_renet::renet::ClientId; + +/// Stores meta-information about connected clients. +#[derive(Default, Resource)] +pub(crate) struct ClientsInfo { + pub(super) info: Vec, + + /// [`Vec`]'s from acknowledged update indexes from [`ClientInfo`]. + /// + /// All data is cleared before the insertion. + /// Stored to reuse allocated capacity. + pub(super) entity_buffer: Vec>, + + /// Disconnected client's [`ClientsInfo`]. + /// + /// [`ClientInfo::clear`] is used before the insertion. + /// Stored to reuse allocated memory. + info_buffer: Vec, +} + +impl ClientsInfo { + /// Initializes a new [`ClientInfo`] for this client. + pub(super) fn init(&mut self, client_id: ClientId) { + let client_info = if let Some(mut client_info) = self.info_buffer.pop() { + client_info.id = client_id; + client_info + } else { + ClientInfo::new(client_id) + }; + + self.info.push(client_info); + } + + /// Removes info for the client. + /// + /// Keeps allocated memory. + pub(super) fn remove(&mut self, client_id: ClientId) { + let index = self + .info + .iter() + .position(|info| info.id == client_id) + .expect("clients info should contain all connected clients"); + let mut client_info = self.info.remove(index); + self.entity_buffer.extend(client_info.reset()); + self.info_buffer.push(client_info); + } + + /// Clears information for all clients. + /// + /// Keeps allocated memory. + pub(super) fn clear(&mut self) { + for mut client_info in self.info.drain(..) { + self.entity_buffer.extend(client_info.reset()); + self.info_buffer.push(client_info); + } + } +} + +pub(super) struct ClientInfo { + pub(super) id: ClientId, + pub(super) just_connected: bool, + pub(super) ticks: EntityHashMap, + pub(super) update_entities: HashMap)>, + next_update_index: u16, +} + +impl ClientInfo { + fn new(id: ClientId) -> Self { + Self { + id, + just_connected: true, + ticks: Default::default(), + update_entities: Default::default(), + next_update_index: Default::default(), + } + } + + /// Resets all data except `id` and drains all [`Vec`]s from update entities mapping. + /// + /// Drained data will be cleared. + /// Keeps allocated memory. + fn reset(&mut self) -> impl Iterator> + '_ { + self.just_connected = true; + self.ticks.clear(); + self.next_update_index = 0; + self.update_entities.drain().map(|(_, (_, mut entities))| { + entities.clear(); + entities + }) + } + + /// Remembers `entities` and `tick` of an update message and returns its index. + /// + /// Used later to acknowledge updated entities. + #[must_use] + pub(super) fn register_update(&mut self, tick: Tick, entities: Vec) -> u16 { + let update_index = self.next_update_index; + self.update_entities.insert(update_index, (tick, entities)); + + self.next_update_index = self.next_update_index.overflowing_add(1).0; + + update_index + } +} diff --git a/src/server/despawn_tracker.rs b/src/server/despawn_tracker.rs deleted file mode 100644 index 80c9961a..00000000 --- a/src/server/despawn_tracker.rs +++ /dev/null @@ -1,95 +0,0 @@ -use bevy::{ - ecs::{component::Tick, system::SystemChangeTick}, - prelude::*, -}; -use bevy_renet::renet::RenetServer; - -use super::{AckedTicks, ServerSet, TicksMap}; -use crate::replicon_core::replication_rules::Replication; - -/// Tracks entity despawns of entities with [`Replication`] component in [`DespawnTracker`] resource. -/// -/// Used only on server. Despawns will be cleaned after all clients acknowledge them. -pub(super) struct DespawnTrackerPlugin; - -impl Plugin for DespawnTrackerPlugin { - fn build(&self, app: &mut App) { - app.init_resource::().add_systems( - PostUpdate, - (Self::cleanup_system, Self::detection_system) - .before(ServerSet::Send) - .run_if(resource_exists::()), - ); - } -} - -impl DespawnTrackerPlugin { - /// Cleanups all acknowledged despawns. - /// - /// Cleans all despawns if [`AckedTicks`] is empty. - fn cleanup_system( - change_tick: SystemChangeTick, - mut despawn_tracker: ResMut, - acked_ticks: Res, - ticks_map: Res, - ) { - despawn_tracker.retain(|(_, tick)| { - acked_ticks.values().any(|acked_tick| { - let system_tick = *ticks_map.get(acked_tick).unwrap_or(&Tick::new(0)); - tick.is_newer_than(system_tick, change_tick.this_run()) - }) - }); - } - - fn detection_system( - change_tick: SystemChangeTick, - mut removed_replications: RemovedComponents, - mut despawn_tracker: ResMut, - ) { - for entity in removed_replications.read() { - despawn_tracker.push((entity, change_tick.this_run())); - } - } -} - -/// Entities and ticks when they were despawned. -#[derive(Default, Resource, Deref, DerefMut)] -pub(crate) struct DespawnTracker(pub(super) Vec<(Entity, Tick)>); - -#[cfg(test)] -mod tests { - use bevy_renet::renet::ClientId; - - use super::*; - use crate::server::RepliconTick; - - #[test] - fn detection() { - let mut app = App::new(); - app.add_plugins(DespawnTrackerPlugin) - .insert_resource(RenetServer::new(Default::default())) - .init_resource::() - .init_resource::(); - - app.update(); - - // To avoid cleanup. - const DUMMY_CLIENT_ID: ClientId = ClientId::from_raw(0); - app.world - .resource_mut::() - .0 - .insert(DUMMY_CLIENT_ID, RepliconTick(0)); - - let replicated_entity = app.world.spawn(Replication).id(); - - app.update(); - - app.world.entity_mut(replicated_entity).despawn(); - - app.update(); - - let despawn_tracker = app.world.resource::(); - assert_eq!(despawn_tracker.len(), 1); - assert_eq!(despawn_tracker.first().unwrap().0, replicated_entity); - } -} diff --git a/src/server/message.rs b/src/server/message.rs deleted file mode 100644 index 3e793f9c..00000000 --- a/src/server/message.rs +++ /dev/null @@ -1,93 +0,0 @@ -pub(super) mod replication_buffer; - -use bevy::{ecs::component::Tick, prelude::*}; -use bevy_renet::renet::{Bytes, ClientId, RenetServer}; - -use crate::replicon_core::{replicon_tick::RepliconTick, REPLICATION_CHANNEL_ID}; -use replication_buffer::ReplicationBuffer; - -/// A reusable message with replicated data for a client. -/// -/// See also [Limits](../index.html#limits) -#[derive(Deref, DerefMut)] -pub(crate) struct ReplicationMessage { - /// ID of a client for which this message is written. - pub(super) client_id: ClientId, - - /// Last system tick acknowledged by the client. - /// - /// Used for changes preparation. - pub(super) system_tick: Tick, - - /// Send message even if it doesn't contain replication data. - /// - /// See also [`Self::send_to`] - send_empty: bool, - - /// Message data. - #[deref] - buffer: ReplicationBuffer, -} - -impl ReplicationMessage { - /// Creates a new message with assigned client ID. - /// - /// `replicon_tick` is the current tick that will be written into - /// the message to read by client on receive. - /// - /// `system_tick` is the last acknowledged system tick for this client. - /// Changes since this tick should be written into the message. - /// - /// If `send_empty` is set to `true`, then [`Self::send_to`] - /// will send the message even if it doesn't contain any data. - pub(super) fn new( - replicon_tick: RepliconTick, - client_id: ClientId, - system_tick: Tick, - send_empty: bool, - ) -> bincode::Result { - let mut buffer = ReplicationBuffer::default(); - buffer.write(&replicon_tick)?; - - Ok(Self { - client_id, - system_tick, - send_empty, - buffer, - }) - } - - /// Clears the message and assigns it to a different client ID. - /// - /// Keeps allocated capacity of the buffer. - pub(super) fn reset( - &mut self, - replicon_tick: RepliconTick, - client_id: ClientId, - system_tick: Tick, - send_empty: bool, - ) -> bincode::Result<()> { - self.client_id = client_id; - self.system_tick = system_tick; - self.send_empty = send_empty; - self.buffer.reset(); - self.buffer.write(&replicon_tick) - } - - /// Sends the message to the designated client. - pub(super) fn send(&mut self, server: &mut RenetServer) { - if !self.buffer.contains_data() && !self.send_empty { - trace!("no changes to send for client {}", self.client_id); - return; - } - - self.buffer.trim_empty_arrays(); - - trace!("sending replication message to client {}", self.client_id); - server.send_message( - self.client_id, - REPLICATION_CHANNEL_ID, - Bytes::copy_from_slice(self.buffer.as_slice()), - ); - } -} diff --git a/src/server/removal_tracker.rs b/src/server/removal_tracker.rs deleted file mode 100644 index 0e589fdd..00000000 --- a/src/server/removal_tracker.rs +++ /dev/null @@ -1,129 +0,0 @@ -use bevy::{ - ecs::{component::Tick, removal_detection::RemovedComponentEvents, system::SystemChangeTick}, - prelude::*, - utils::HashMap, -}; -use bevy_renet::renet::RenetServer; - -use super::{AckedTicks, ServerSet, TicksMap}; -use crate::replicon_core::replication_rules::{Replication, ReplicationId, ReplicationRules}; - -/// Stores component removals in [`RemovalTracker`] component to make them persistent across ticks. -/// -/// Used only on server and tracks only entities with [`Replication`] component. -pub(super) struct RemovalTrackerPlugin; - -impl Plugin for RemovalTrackerPlugin { - fn build(&self, app: &mut App) { - app.add_systems( - PostUpdate, - ( - Self::insertion_system, - Self::cleanup_system, - Self::detection_system.run_if(resource_exists::()), - ) - .before(ServerSet::Send) - .run_if(resource_exists::()), - ); - } -} - -impl RemovalTrackerPlugin { - fn insertion_system( - mut commands: Commands, - new_replicated_entities: Query, Without)>, - ) { - for entity in &new_replicated_entities { - commands.entity(entity).insert(RemovalTracker::default()); - } - } - - /// Cleanups all acknowledged despawns. - fn cleanup_system( - change_tick: SystemChangeTick, - acked_ticks: Res, - ticks_map: Res, - mut removal_trackers: Query<&mut RemovalTracker>, - ) { - for mut removal_tracker in &mut removal_trackers { - removal_tracker.retain(|_, tick| { - acked_ticks.values().any(|acked_tick| { - let system_tick = *ticks_map.get(acked_tick).unwrap_or(&Tick::new(0)); - tick.is_newer_than(system_tick, change_tick.this_run()) - }) - }); - } - } - - fn detection_system( - change_tick: SystemChangeTick, - remove_events: &RemovedComponentEvents, - replication_rules: Res, - mut removal_trackers: Query<&mut RemovalTracker>, - ) { - for (&component_id, &replication_id) in replication_rules.get_ids() { - for entity in remove_events - .get(component_id) - .map(|removed| removed.iter_current_update_events().cloned()) - .into_iter() - .flatten() - .map(Into::into) - { - if let Ok(mut removal_tracker) = removal_trackers.get_mut(entity) { - removal_tracker.insert(replication_id, change_tick.this_run()); - } - } - } - } -} - -#[derive(Component, Default, Deref, DerefMut)] -pub(crate) struct RemovalTracker(pub(super) HashMap); - -#[cfg(test)] -mod tests { - use bevy_renet::renet::ClientId; - use serde::{Deserialize, Serialize}; - - use super::*; - use crate::{replicon_core::replication_rules::AppReplicationExt, server::RepliconTick}; - - #[test] - fn detection() { - let mut app = App::new(); - app.add_plugins(RemovalTrackerPlugin) - .insert_resource(RenetServer::new(Default::default())) - .init_resource::() - .init_resource::() - .init_resource::() - .replicate::(); - - app.update(); - - // To avoid cleanup. - const DUMMY_CLIENT_ID: ClientId = ClientId::from_raw(0); - app.world - .resource_mut::() - .0 - .insert(DUMMY_CLIENT_ID, RepliconTick(0)); - - let replicated_entity = app.world.spawn((DummyComponent, Replication)).id(); - - app.update(); - - app.world - .entity_mut(replicated_entity) - .remove::(); - - app.update(); - - let component_id = app.world.init_component::(); - let replcation_rules = app.world.resource::(); - let (replication_id, _) = replcation_rules.get(component_id).unwrap(); - let removal_tracker = app.world.get::(replicated_entity).unwrap(); - assert!(removal_tracker.contains_key(&replication_id)); - } - - #[derive(Serialize, Deserialize, Component)] - struct DummyComponent; -} diff --git a/src/server/message/replication_buffer.rs b/src/server/replication_buffer.rs similarity index 69% rename from src/server/message/replication_buffer.rs rename to src/server/replication_buffer.rs index bd792bdc..2352a412 100644 --- a/src/server/message/replication_buffer.rs +++ b/src/server/replication_buffer.rs @@ -1,4 +1,7 @@ -use std::{io::Cursor, mem}; +use std::{ + io::{Cursor, Write}, + mem, +}; use bevy::{prelude::*, ptr::Ptr}; use bincode::{DefaultOptions, Options}; @@ -11,17 +14,20 @@ use crate::{ }; /// A reusable buffer with replicated data. -pub(crate) struct ReplicationBuffer { +pub(super) struct ReplicationBuffer { /// Serialized data. cursor: Cursor>, + /// An indicator of whether the array is currently writing. + inside_array: bool, + /// Position of the array from last call of [`Self::start_array`]. array_pos: u64, /// Length of the array that updated automatically after writing data. array_len: u16, - /// The number of arrays excluding trailing empty arrays. + /// The number of arrays excluding empty arrays. arrays_with_data: usize, /// The number of empty arrays at the end. Can be removed using [`Self::trim_empty_arrays`] @@ -33,8 +39,8 @@ pub(crate) struct ReplicationBuffer { /// Position of entity data length from last call of [`Self::write_data_entity`]. entity_data_len_pos: u64, - /// Length of the data for entity that updated automatically after writing data. - entity_data_len: u8, + /// Length in bytes of the component data stored for the currently-being-written entity. + entity_data_len: u16, /// Entity from last call of [`Self::start_entity_data`]. data_entity: Entity, @@ -46,19 +52,39 @@ impl ReplicationBuffer { /// Keeps allocated capacity. pub(super) fn reset(&mut self) { self.cursor.set_position(0); - self.cursor.get_mut().clear(); self.arrays_with_data = 0; self.trailing_empty_arrays = 0; } - /// Returns `true` if the buffer contains at least one non-empty array. - pub(super) fn contains_data(&self) -> bool { - self.arrays_with_data != 0 + /// Returns the number of arrays excluding empty arrays. + pub(super) fn arrays_with_data(&self) -> usize { + self.arrays_with_data + } + + /// Returns position from the last [`Self::start_entity_data`] call. + pub(super) fn entity_data_pos(&self) -> u64 { + self.entity_data_pos + } + + /// Returns length in bytes of the current entity data. + /// + /// See also [`Self::start_entity_data`] and [`Self::end_entity_data`]. + pub(super) fn entity_data_len(&self) -> u16 { + self.entity_data_len + } + + /// Returns entity from last call of [`Self::start_entity_data`]. + /// + /// See also [`Self::start_entity_data`] and [`Self::end_entity_data`]. + pub(super) fn data_entity(&self) -> Entity { + self.data_entity } /// Returns the buffer as a byte array. pub(super) fn as_slice(&self) -> &[u8] { - self.cursor.get_ref() + let slice = self.cursor.get_ref(); + let position = self.cursor.position() as usize; + &slice[..position] } /// Writes the `value` into the buffer. @@ -66,6 +92,7 @@ impl ReplicationBuffer { /// Should happen outside of array or entity data and the buffer shouldn't contain trailing empty arrays. /// See also [`Self::start_array`] and [`Self::start_entity_data`]. pub(super) fn write(&mut self, value: &impl Serialize) -> bincode::Result<()> { + debug_assert!(!self.inside_array); debug_assert_eq!(self.array_len, 0); debug_assert_eq!(self.entity_data_len, 0); debug_assert_eq!(self.trailing_empty_arrays, 0); @@ -78,10 +105,12 @@ impl ReplicationBuffer { /// Arrays can contain entity data or despawns inside. /// Length will be increased automatically after writing data. /// See also [`Self::end_array`], [`Self::write_client_mapping`], [`Self::write_entity`] and [`Self::start_entity_data`]. - pub(crate) fn start_array(&mut self) { + pub(super) fn start_array(&mut self) { debug_assert_eq!(self.array_len, 0); + debug_assert!(!self.inside_array); self.array_pos = self.cursor.position(); + self.inside_array = true; self.cursor .set_position(self.array_pos + mem::size_of_val(&self.array_len) as u64); } @@ -89,7 +118,9 @@ impl ReplicationBuffer { /// Ends writing array by writing its length into the last remembered position. /// /// See also [`Self::start_array`]. - pub(crate) fn end_array(&mut self) -> bincode::Result<()> { + pub(super) fn end_array(&mut self) -> bincode::Result<()> { + debug_assert!(self.inside_array); + if self.array_len != 0 { let previous_pos = self.cursor.position(); self.cursor.set_position(self.array_pos); @@ -105,6 +136,7 @@ impl ReplicationBuffer { self.cursor.set_position(self.array_pos); bincode::serialize_into(&mut self.cursor, &self.array_len)?; } + self.inside_array = false; Ok(()) } @@ -114,7 +146,9 @@ impl ReplicationBuffer { /// Should be called only inside array. /// Increases array length by 1. /// See also [`Self::start_array`]. - pub(crate) fn write_client_mapping(&mut self, mapping: &ClientMapping) -> bincode::Result<()> { + pub(super) fn write_client_mapping(&mut self, mapping: &ClientMapping) -> bincode::Result<()> { + debug_assert!(self.inside_array); + serialize_entity(&mut self.cursor, mapping.server_entity)?; serialize_entity(&mut self.cursor, mapping.client_entity)?; self.array_len = self @@ -130,7 +164,9 @@ impl ReplicationBuffer { /// Should be called only inside array. /// Increases array length by 1. /// See also [`Self::start_array`]. - pub(crate) fn write_entity(&mut self, entity: Entity) -> bincode::Result<()> { + pub(super) fn write_entity(&mut self, entity: Entity) -> bincode::Result<()> { + debug_assert!(self.inside_array); + serialize_entity(&mut self.cursor, entity)?; self.array_len = self .array_len @@ -140,14 +176,15 @@ impl ReplicationBuffer { Ok(()) } - /// Starts writing entity and its data as an array element. + /// Starts writing entity and its data. /// /// Data can contain components with their IDs or IDs only. /// Length will be increased automatically after writing data. /// Entity will be written lazily after first data write. + /// Can be called inside and outside of an array. /// See also [`Self::end_entity_data`], [`Self::write_component`] /// and [`Self::write_component_id`]. - pub(crate) fn start_entity_data(&mut self, entity: Entity) { + pub(super) fn start_entity_data(&mut self, entity: Entity) { debug_assert_eq!(self.entity_data_len, 0); self.data_entity = entity; @@ -170,9 +207,10 @@ impl ReplicationBuffer { /// Ends writing entity data by writing its length into the last remembered position. /// /// If the entity data is empty, nothing will be written. + /// Increases array length if writing is done inside an array. /// See also [`Self::start_array`], [`Self::write_component`] and /// [`Self::write_component_id`]. - pub(crate) fn end_entity_data(&mut self) -> bincode::Result<()> { + pub(super) fn end_entity_data(&mut self) -> bincode::Result<()> { if self.entity_data_len != 0 { let previous_pos = self.cursor.position(); self.cursor.set_position(self.entity_data_len_pos); @@ -181,10 +219,12 @@ impl ReplicationBuffer { self.cursor.set_position(previous_pos); self.entity_data_len = 0; - self.array_len = self - .array_len - .checked_add(1) - .ok_or(bincode::ErrorKind::SizeLimit)?; + if self.inside_array { + self.array_len = self + .array_len + .checked_add(1) + .ok_or(bincode::ErrorKind::SizeLimit)?; + } } else { self.cursor.set_position(self.entity_data_pos); } @@ -197,7 +237,7 @@ impl ReplicationBuffer { /// Should be called only inside entity data. /// Increases entity data length by 1. /// See also [`Self::start_entity_data`]. - pub(crate) fn write_component( + pub(super) fn write_component( &mut self, replication_info: &ReplicationInfo, replication_id: ReplicationId, @@ -207,9 +247,17 @@ impl ReplicationBuffer { self.write_data_entity()?; } + let previous_pos = self.cursor.position(); DefaultOptions::new().serialize_into(&mut self.cursor, &replication_id)?; (replication_info.serialize)(ptr, &mut self.cursor)?; - self.entity_data_len += 1; + + let component_len = (self.cursor.position() - previous_pos) + .try_into() + .map_err(|_| bincode::ErrorKind::SizeLimit)?; + self.entity_data_len = self + .entity_data_len + .checked_add(component_len) + .ok_or(bincode::ErrorKind::SizeLimit)?; Ok(()) } @@ -219,7 +267,7 @@ impl ReplicationBuffer { /// Should be called only inside entity data. /// Increases entity data length by 1. /// See also [`Self::start_entity_data`]. - pub(crate) fn write_replication_id( + pub(super) fn write_replication_id( &mut self, replication_id: ReplicationId, ) -> bincode::Result<()> { @@ -233,17 +281,36 @@ impl ReplicationBuffer { Ok(()) } + /// Removes entity data elements from `other` and copies it. + /// + /// Ends entity data for `other`. + /// See also [`Self::start_entity_data`] and [`Self::end_entity_data`]. + pub(super) fn take_entity_data(&mut self, other: &mut Self) { + if other.entity_data_len != 0 { + let slice = other.as_slice(); + let offset = + other.entity_data_len_pos as usize + mem::size_of_val(&other.entity_data_len); + self.cursor.write_all(&slice[offset..]).unwrap(); + self.entity_data_len += other.entity_data_len; + + other.entity_data_len = 0; + } + + other.cursor.set_position(other.entity_data_pos); + } + /// Crops empty arrays at the end. /// /// Should only be called after all arrays have been written, because - /// removed array somewhere the middle cannot be detected during deserialization. + /// arrays removed somewhere the middle cannot be detected during deserialization. pub(super) fn trim_empty_arrays(&mut self) { + debug_assert!(!self.inside_array); debug_assert_eq!(self.array_len, 0); debug_assert_eq!(self.entity_data_len, 0); - let used_len = self.cursor.get_ref().len() - - self.trailing_empty_arrays * mem::size_of_val(&self.array_len); - self.cursor.get_mut().truncate(used_len); + let extra_len = self.trailing_empty_arrays * mem::size_of_val(&self.array_len); + self.cursor + .set_position(self.cursor.position() - extra_len as u64); } } @@ -253,6 +320,7 @@ impl Default for ReplicationBuffer { cursor: Default::default(), array_pos: Default::default(), array_len: Default::default(), + inside_array: Default::default(), arrays_with_data: Default::default(), trailing_empty_arrays: Default::default(), entity_data_pos: Default::default(), @@ -288,7 +356,6 @@ mod tests { fn trimming_arrays() -> bincode::Result<()> { let mut buffer = ReplicationBuffer::default(); - let begin_len = buffer.cursor.get_ref().len(); for _ in 0..3 { buffer.start_array(); buffer.end_array()?; @@ -296,7 +363,7 @@ mod tests { buffer.trim_empty_arrays(); - assert_eq!(buffer.cursor.get_ref().len(), begin_len); + assert!(buffer.as_slice().is_empty()); Ok(()) } diff --git a/src/server/replication_messages.rs b/src/server/replication_messages.rs new file mode 100644 index 00000000..09028d4a --- /dev/null +++ b/src/server/replication_messages.rs @@ -0,0 +1,299 @@ +use std::mem; + +use bevy::{ecs::component::Tick, prelude::*}; +use bevy_renet::renet::{Bytes, ClientId, RenetServer}; + +use super::{replication_buffer::ReplicationBuffer, ClientInfo, LastChangeTick}; +use crate::replicon_core::{replicon_tick::RepliconTick, ReplicationChannel}; + +/// Accumulates replication messages and sends them to clients. +/// +/// Messages are serialized and deserialized manually because using an intermediate structure +/// leads to allocations and according to our benchmarks it's much slower. +/// +/// Reuses allocated memory from older messages. +#[derive(Default)] +pub(crate) struct ReplicationMessages { + info: Vec, + data: Vec<(InitMessage, UpdateMessage)>, + clients_count: usize, +} + +impl ReplicationMessages { + /// Initializes messages for each client. + /// + /// Reuses already allocated messages. + /// Creates new messages if the number of clients is bigger then the number of allocated messages. + /// If there are more messages than the number of clients, then the extra messages remain untouched + /// and iteration methods will not include them. + pub(super) fn prepare( + &mut self, + info: Vec, + replicon_tick: RepliconTick, + ) -> bincode::Result<()> { + self.clients_count = info.len(); + + self.data.reserve(self.clients_count); + + for index in 0..info.len() { + if let Some((init_message, update_message)) = self.data.get_mut(index) { + init_message.reset(replicon_tick)?; + update_message.reset()?; + } else { + self.data + .push((InitMessage::new(replicon_tick)?, UpdateMessage::default())); + } + } + + self.info = info; + + Ok(()) + } + + /// Returns iterator over messages for each client. + pub(super) fn iter_mut(&mut self) -> impl Iterator { + self.data.iter_mut().take(self.clients_count) + } + + /// Same as [`Self::iter_mut`], but also iterates over clients info. + pub(super) fn iter_mut_with_info( + &mut self, + ) -> impl Iterator { + self.data + .iter_mut() + .take(self.clients_count) + .zip(&mut self.info) + .map(|((init_message, update_message), client_info)| { + (init_message, update_message, client_info) + }) + } + + /// Sends cached messages to clients specified in the last [`Self::prepare`] call. + /// + /// Returns the server's last change tick, which will equal the latest replicon tick if any init + /// messages were sent to clients. If only update messages were sent (or no messages at all) then + /// it will equal the input `last_change_tick`. + pub(super) fn send( + &mut self, + server: &mut RenetServer, + entity_buffer: &mut Vec>, + mut last_change_tick: LastChangeTick, + replicon_tick: RepliconTick, + tick: Tick, + ) -> bincode::Result<(LastChangeTick, Vec)> { + if let Some((init_message, _)) = self.data.first() { + if init_message.is_sendable() { + last_change_tick.0 = replicon_tick; + } + } + + for ((init_message, update_message), client_info) in self + .data + .iter_mut() + .take(self.clients_count) + .zip(&mut self.info) + { + init_message.send(server, client_info.id); + update_message.send( + server, + entity_buffer, + client_info, + last_change_tick, + replicon_tick, + tick, + )?; + } + + Ok((last_change_tick, mem::take(&mut self.info))) + } +} + +/// A reusable message with replicated data. +/// +/// Contains tick and mappings, insertions, removals and despawns that +/// happened on this tick. +/// Sent over [`ReplicationChannel::Reliable`] channel. +/// +/// See also [Limits](../index.html#limits) +#[derive(Deref, DerefMut)] +pub(super) struct InitMessage { + /// Message data. + #[deref] + buffer: ReplicationBuffer, +} + +impl InitMessage { + /// Creates a new message for the specified tick. + fn new(replicon_tick: RepliconTick) -> bincode::Result { + let mut buffer = ReplicationBuffer::default(); + buffer.write(&replicon_tick)?; + + Ok(Self { buffer }) + } + + /// Clears the message and assigns tick to it. + /// + /// Keeps allocated capacity of the buffer. + fn reset(&mut self, replicon_tick: RepliconTick) -> bincode::Result<()> { + self.buffer.reset(); + self.buffer.write(&replicon_tick) + } + + /// Returns `true` is message contains any non-empty arrays. + fn is_sendable(&self) -> bool { + self.buffer.arrays_with_data() != 0 + } + + /// Trims empty arrays from the message and sends it to the specified client. + /// + /// Does nothing if there is no data to send. + fn send(&mut self, server: &mut RenetServer, client_id: ClientId) { + if !self.is_sendable() { + trace!("no init data to send for client {client_id}"); + return; + } + + self.buffer.trim_empty_arrays(); + + trace!("sending init message to client {client_id}"); + server.send_message( + client_id, + ReplicationChannel::Reliable, + Bytes::copy_from_slice(self.buffer.as_slice()), + ); + } +} + +/// A reusable message with replicated component updates. +/// +/// Contains last change tick, current tick and component updates since the last acknowledged tick for each entity. +/// Cannot be applied on the client until the init message matching this update message's last change tick +/// has been applied to the client world. +/// The message will be manually split into packets up to max size, and each packet will be applied +/// independently on the client. +/// Message splits only happen per-entity to avoid weird behavior from partial entity updates. +/// Sent over the [`ReplicationChannel::Unreliable`] channel. +/// +/// See also [Limits](../index.html#limits) +#[derive(Deref, DerefMut, Default)] +pub(super) struct UpdateMessage { + /// Entities and their data sizes. + entities: Vec<(Entity, usize)>, + + /// Message data. + #[deref] + buffer: ReplicationBuffer, +} + +impl UpdateMessage { + /// Clears the message. + /// + /// Keeps allocated capacity of the buffer. + fn reset(&mut self) -> bincode::Result<()> { + self.entities.clear(); + self.buffer.reset(); + + Ok(()) + } + + /// Registers entity from buffer's entity data and its size for possible splitting. + pub(super) fn register_entity(&mut self) { + let data_size = self.buffer.as_slice().len() - self.buffer.entity_data_pos() as usize; + self.entities.push((self.buffer.data_entity(), data_size)); + } + + /// Returns `true` is message contains any written data. + fn is_sendable(&self) -> bool { + !self.buffer.as_slice().is_empty() + } + + /// Splits message according to entities inside it and sends it to the specified client. + /// + /// Does nothing if there is no data to send. + fn send( + &mut self, + server: &mut RenetServer, + entity_buffer: &mut Vec>, + client_info: &mut ClientInfo, + last_change_tick: LastChangeTick, + replicon_tick: RepliconTick, + tick: Tick, + ) -> bincode::Result<()> { + if !self.is_sendable() { + trace!("no updates to send for client {}", client_info.id); + return Ok(()); + } + + trace!("sending update message(s) to client {}", client_info.id); + const TICKS_SIZE: usize = 2 * mem::size_of::(); + let mut header = [0; TICKS_SIZE + mem::size_of::()]; + bincode::serialize_into(&mut header[..], &(*last_change_tick, replicon_tick))?; + + let mut slice = self.buffer.as_slice(); + let mut entities = entity_buffer.pop().unwrap_or_default(); + let mut message_size = 0; + for &(entity, data_size) in &self.entities { + // Try to pack back first, then try to pack forward. + if message_size == 0 + || can_pack(header.len(), message_size, data_size) + || can_pack(header.len(), data_size, message_size) + { + entities.push(entity); + message_size += data_size; + } else { + let (message, remaining) = slice.split_at(message_size); + slice = remaining; + message_size = data_size; + + let update_index = client_info.register_update(tick, entities); + bincode::serialize_into(&mut header[TICKS_SIZE..], &update_index)?; + + server.send_message( + client_info.id, + ReplicationChannel::Unreliable, + Bytes::from_iter(header.into_iter().chain(message.iter().copied())), + ); + + entities = entity_buffer.pop().unwrap_or_default(); + } + } + + if !slice.is_empty() { + let update_index = client_info.register_update(tick, entities); + bincode::serialize_into(&mut header[TICKS_SIZE..], &update_index)?; + + server.send_message( + client_info.id, + ReplicationChannel::Unreliable, + Bytes::from_iter(header.into_iter().chain(slice.iter().copied())), + ); + } + + Ok(()) + } +} + +fn can_pack(header_len: usize, base: usize, add: usize) -> bool { + const MAX_PACKET_SIZE: usize = 1200; // https://github.com/lucaspoffo/renet/blob/acee8b470e34c70d35700d96c00fb233d9cf6919/renet/src/packet.rs#L7 + + let dangling = (base + header_len) % MAX_PACKET_SIZE; + (dangling > 0) && ((dangling + add) <= MAX_PACKET_SIZE) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn packing() { + assert!(can_pack(10, 0, 5)); + assert!(can_pack(10, 0, 1190)); + assert!(!can_pack(10, 0, 1191)); + assert!(!can_pack(10, 0, 3000)); + + assert!(can_pack(10, 1189, 1)); + assert!(!can_pack(10, 1190, 0)); + assert!(!can_pack(10, 1190, 1)); + assert!(!can_pack(10, 1190, 3000)); + } +} diff --git a/tests/replication.rs b/tests/replication.rs index 038a00f8..5d121a52 100644 --- a/tests/replication.rs +++ b/tests/replication.rs @@ -1,5 +1,7 @@ mod common; +use std::ops::DerefMut; + use bevy::prelude::*; use bevy_replicon::{prelude::*, scene}; @@ -7,7 +9,7 @@ use bevy_renet::renet::{transport::NetcodeClientTransport, ClientId}; use serde::{Deserialize, Serialize}; #[test] -fn acked_ticks_cleanup() { +fn reset() { let mut server_app = App::new(); let mut client_app = App::new(); for app in [&mut server_app, &mut client_app] { @@ -19,16 +21,22 @@ fn acked_ticks_cleanup() { common::connect(&mut server_app, &mut client_app); - let mut client_transport = client_app.world.resource_mut::(); - client_transport.disconnect(); - let client_id = ClientId::from_raw(client_transport.client_id()); + client_app.world.resource_mut::().disconnect(); client_app.update(); server_app.update(); + + client_app.update(); + server_app.update(); + + client_app.world.remove_resource::(); + server_app.world.remove_resource::(); + server_app.update(); + client_app.update(); - let acked_ticks = server_app.world.resource::(); - assert!(!acked_ticks.contains_key(&client_id)); + assert_eq!(server_app.world.resource::().get(), 0); + assert_eq!(client_app.world.resource::().get(), 0); } #[test] @@ -93,7 +101,6 @@ fn client_spawn_replication() { let client_entity = client_app.world.spawn_empty().id(); let server_entity = server_app.world.spawn((Replication, TableComponent)).id(); - let tick = *server_app.world.get_resource::().unwrap(); let client_transport = client_app.world.resource::(); let client_id = ClientId::from_raw(client_transport.client_id()); @@ -101,7 +108,6 @@ fn client_spawn_replication() { entity_map.insert( client_id, ClientMapping { - tick, server_entity, client_entity, }, @@ -143,7 +149,6 @@ fn client_spawn_replication() { fn insert_replication() { let mut server_app = App::new(); let mut client_app = App::new(); - for app in [&mut server_app, &mut client_app] { app.add_plugins(( MinimalPlugins, @@ -286,6 +291,280 @@ fn despawn_replication() { assert!(entity_map.to_server().is_empty()); } +#[test] +fn old_entities_replication() { + let mut server_app = App::new(); + let mut client_app = App::new(); + for app in [&mut server_app, &mut client_app] { + app.add_plugins(( + MinimalPlugins, + ReplicationPlugins.set(ServerPlugin::new(TickPolicy::EveryFrame)), + )) + .replicate::(); + } + + // Spawn an entity before client connected. + server_app.world.spawn((Replication, TableComponent)); + + common::connect(&mut server_app, &mut client_app); + + assert_eq!(client_app.world.entities().len(), 1); +} + +#[test] +fn update_replication() { + let mut server_app = App::new(); + let mut client_app = App::new(); + for app in [&mut server_app, &mut client_app] { + app.add_plugins(( + MinimalPlugins, + ReplicationPlugins.set(ServerPlugin::new(TickPolicy::EveryFrame)), + )) + .replicate::(); + } + + common::connect(&mut server_app, &mut client_app); + + let server_entity = server_app + .world + .spawn((Replication, BoolComponent(false))) + .id(); + + server_app.update(); + client_app.update(); + + let mut component = server_app + .world + .get_mut::(server_entity) + .unwrap(); + component.0 = true; + + server_app.update(); + client_app.update(); + + let component = client_app + .world + .query::<&BoolComponent>() + .single(&client_app.world); + assert!(component.0); +} + +#[test] +fn big_entity_update_replication() { + let mut server_app = App::new(); + let mut client_app = App::new(); + for app in [&mut server_app, &mut client_app] { + app.add_plugins(( + MinimalPlugins, + ReplicationPlugins.set(ServerPlugin::new(TickPolicy::EveryFrame)), + )) + .replicate::(); + } + + common::connect(&mut server_app, &mut client_app); + + let server_entity = server_app + .world + .spawn((Replication, VecComponent::default())) + .id(); + + server_app.update(); + client_app.update(); + + // To exceed packed size. + const BIG_DATA: &[u8] = &[0; 1200]; + let mut component = server_app + .world + .get_mut::(server_entity) + .unwrap(); + component.0 = BIG_DATA.to_vec(); + + server_app.update(); + client_app.update(); + + let component = client_app + .world + .query::<&VecComponent>() + .single(&client_app.world); + assert_eq!(component.0, BIG_DATA); +} + +#[test] +fn many_entities_update_replication() { + let mut server_app = App::new(); + let mut client_app = App::new(); + for app in [&mut server_app, &mut client_app] { + app.add_plugins(( + MinimalPlugins, + ReplicationPlugins.set(ServerPlugin::new(TickPolicy::EveryFrame)), + )) + .replicate::(); + } + + common::connect(&mut server_app, &mut client_app); + + // Spawn many entities to cover message splitting. + const ENTITIES_COUNT: u32 = 300; + server_app + .world + .spawn_batch([(Replication, BoolComponent(false)); ENTITIES_COUNT as usize]); + + server_app.update(); + client_app.update(); + + assert_eq!(client_app.world.entities().len(), ENTITIES_COUNT); + + for mut component in server_app + .world + .query::<&mut BoolComponent>() + .iter_mut(&mut server_app.world) + { + component.0 = true; + } + + server_app.update(); + client_app.update(); + + for component in client_app + .world + .query::<&BoolComponent>() + .iter(&client_app.world) + { + assert!(component.0); + } +} + +#[test] +fn insert_update_replication() { + let mut server_app = App::new(); + let mut client_app = App::new(); + for app in [&mut server_app, &mut client_app] { + app.add_plugins(( + MinimalPlugins, + ReplicationPlugins.set(ServerPlugin::new(TickPolicy::EveryFrame)), + )) + .replicate::() + .replicate::(); + } + + common::connect(&mut server_app, &mut client_app); + + let server_entity = server_app + .world + .spawn((Replication, BoolComponent(false))) + .id(); + + server_app.update(); + client_app.update(); + + let mut server_entity = server_app.world.entity_mut(server_entity); + server_entity.get_mut::().unwrap().0 = true; + server_entity.insert(TableComponent); + + server_app.update(); + client_app.update(); + + let component = client_app + .world + .query_filtered::<&BoolComponent, With>() + .single(&client_app.world); + assert!(component.0); +} + +#[test] +fn despawn_update_replication() { + let mut server_app = App::new(); + let mut client_app = App::new(); + for app in [&mut server_app, &mut client_app] { + app.add_plugins(( + MinimalPlugins, + ReplicationPlugins.set(ServerPlugin::new(TickPolicy::EveryFrame)), + )) + .replicate::() + .replicate::(); + } + + common::connect(&mut server_app, &mut client_app); + + let server_entity = server_app + .world + .spawn((Replication, BoolComponent(false))) + .id(); + + server_app.update(); + client_app.update(); + + let mut component = server_app + .world + .get_mut::(server_entity) + .unwrap(); + component.0 = true; + + // Update without client to send update message. + server_app.update(); + + server_app.world.despawn(server_entity); + + server_app.update(); + client_app.update(); + + assert!(client_app.world.entities().is_empty()); +} + +#[test] +fn update_replication_buffering() { + let mut server_app = App::new(); + let mut client_app = App::new(); + for app in [&mut server_app, &mut client_app] { + app.add_plugins(( + MinimalPlugins, + ReplicationPlugins.set(ServerPlugin::new(TickPolicy::EveryFrame)), + )) + .replicate::(); + } + + common::connect(&mut server_app, &mut client_app); + + let server_entity = server_app + .world + .spawn((Replication, BoolComponent(false))) + .id(); + + let old_tick = *server_app.world.resource::(); + + server_app.update(); + client_app.update(); + + // Artificially rollback the client by 1 tick to force next received update to be buffered. + *client_app.world.resource_mut::() = old_tick; + let mut component = server_app + .world + .get_mut::(server_entity) + .unwrap(); + component.0 = true; + + server_app.update(); + client_app.update(); + + let (client_entity, component) = client_app + .world + .query::<(Entity, &BoolComponent)>() + .single(&client_app.world); + assert!(!component.0, "client should buffer the update"); + + // Move tick forward to let the buffered update apply. + client_app.world.resource_mut::().increment(); + + server_app.update(); + client_app.update(); + + let component = client_app + .world + .get::(client_entity) + .unwrap(); + assert!(component.0, "buffered update should be applied"); +} + #[test] fn replication_into_scene() { let mut app = App::new(); @@ -338,15 +617,12 @@ fn diagnostics() { let client_entity = client_app.world.spawn_empty().id(); let server_entity = server_app.world.spawn((Replication, TableComponent)).id(); - let tick = *server_app.world.get_resource::().unwrap(); let client_transport = client_app.world.resource::(); let client_id = ClientId::from_raw(client_transport.client_id()); - let mut entity_map = server_app.world.resource_mut::(); entity_map.insert( client_id, ClientMapping { - tick, server_entity, client_entity, }, @@ -357,13 +633,23 @@ fn diagnostics() { server_app.update(); client_app.update(); + // Trigger change detection. + server_app + .world + .get_mut::(server_entity) + .unwrap() + .deref_mut(); + + server_app.update(); + client_app.update(); + let stats = client_app.world.resource::(); - assert_eq!(stats.entities_changed, 1); - assert_eq!(stats.components_changed, 1); + assert_eq!(stats.entities_changed, 2); + assert_eq!(stats.components_changed, 2); assert_eq!(stats.mappings, 1); assert_eq!(stats.despawns, 1); - assert_eq!(stats.packets, 1); - assert_eq!(stats.bytes, 18); + assert_eq!(stats.packets, 2); + assert_eq!(stats.bytes, 33); } #[derive(Component, Deserialize, Serialize)] @@ -388,6 +674,12 @@ struct NonReplicatingComponent; #[derive(Component, Deserialize, Serialize)] struct IgnoredComponent; +#[derive(Clone, Component, Copy, Deserialize, Serialize)] +struct BoolComponent(bool); + +#[derive(Component, Default, Deserialize, Serialize)] +struct VecComponent(Vec); + #[derive(Component, Default, Deserialize, Reflect, Serialize)] #[reflect(Component)] struct ReflectedComponent; diff --git a/tests/server_event.rs b/tests/server_event.rs index c4f40838..de25b2a8 100644 --- a/tests/server_event.rs +++ b/tests/server_event.rs @@ -5,6 +5,7 @@ use bevy_renet::renet::{transport::NetcodeClientTransport, ClientId}; use bevy_replicon::prelude::*; use common::DummyEvent; +use serde::{Deserialize, Serialize}; #[test] fn without_server_plugin() { @@ -160,45 +161,38 @@ fn event_queue() { MinimalPlugins, ReplicationPlugins.set(ServerPlugin::new(TickPolicy::EveryFrame)), )) + .replicate::() .add_server_event::(EventType::Ordered); } common::connect(&mut server_app, &mut client_app); - // Simulate event that received two ticks earlier. - let mut tick = *server_app.world.resource::(); - tick.increment_by(2); - client_app - .world - .resource_mut::>() - .insert(tick, DummyEvent(Entity::PLACEHOLDER)); - - // Send another event to trigger world update. - server_app - .world - .resource_mut::>>() - .send(ToClients { - mode: SendMode::Broadcast, - event: DummyEvent(Entity::PLACEHOLDER), - }); + // Spawn entity to trigger world change. + server_app.world.spawn((Replication, DummyComponent)); + let old_tick = *server_app.world.resource::(); server_app.update(); client_app.update(); - let mut dummy_events = client_app.world.resource_mut::>(); - assert_eq!( - dummy_events.drain().count(), - 1, - "should emit only single event for current tick" - ); + // Artificially rollback the client by 1 tick to force next received event to be queued. + *client_app.world.resource_mut::() = old_tick; + server_app.world.send_event(ToClients { + mode: SendMode::Broadcast, + event: DummyEvent(Entity::PLACEHOLDER), + }); server_app.update(); client_app.update(); + assert!(client_app.world.resource::>().is_empty()); + + client_app.world.resource_mut::().increment(); + + client_app.update(); + let dummy_events = client_app.world.resource::>(); - assert_eq!( - dummy_events.len(), - 1, - "should emit another event received earlier" - ); + assert_eq!(dummy_events.len(), 1); } + +#[derive(Component, Serialize, Deserialize)] +struct DummyComponent;