diff --git a/crates/bevy_pbr/Cargo.toml b/crates/bevy_pbr/Cargo.toml index 6d1fce72bfd1f..1666c03c1043b 100644 --- a/crates/bevy_pbr/Cargo.toml +++ b/crates/bevy_pbr/Cargo.toml @@ -22,7 +22,7 @@ bluenoise_texture = ["bevy_image/ktx2", "bevy_image/zstd"] shader_format_glsl = ["bevy_shader/shader_format_glsl"] trace = ["bevy_render/trace"] # Enables the meshlet renderer for dense high-poly scenes (experimental) -meshlet = ["dep:lz4_flex", "dep:range-alloc", "dep:bevy_tasks"] +meshlet = ["dep:lz4_flex", "dep:range-alloc"] # Enables processing meshes into meshlet meshes meshlet_processor = [ "meshlet", @@ -56,7 +56,7 @@ bevy_render = { path = "../bevy_render", version = "0.19.0-dev", features = [ "morph", ] } bevy_camera = { path = "../bevy_camera", version = "0.19.0-dev" } -bevy_tasks = { path = "../bevy_tasks", version = "0.19.0-dev", optional = true } +bevy_tasks = { path = "../bevy_tasks", version = "0.19.0-dev" } bevy_transform = { path = "../bevy_transform", version = "0.19.0-dev" } bevy_utils = { path = "../bevy_utils", version = "0.19.0-dev" } bevy_platform = { path = "../bevy_platform", version = "0.19.0-dev", default-features = false, features = [ diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index e06d2920ce137..43780603b2767 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -63,11 +63,11 @@ use bevy_render::{ }; use bevy_shader::{load_shader_library, Shader, ShaderDefVal, ShaderSettings}; use bevy_transform::components::GlobalTransform; -use bevy_utils::{default, Parallel, TypeIdMap}; +use bevy_utils::{default, BufferedChannel, Parallel, TypeIdMap}; use core::any::TypeId; use core::mem::size_of; use material_bind_groups::MaterialBindingId; -use tracing::{error, warn}; +use tracing::{error, info_span, warn, Instrument}; use self::irradiance_volume::IRRADIANCE_VOLUMES_ARE_USABLE; use crate::{ @@ -90,6 +90,7 @@ use bevy_render::prelude::Msaa; use bevy_render::sync_world::{MainEntity, MainEntityHashMap}; use bevy_render::view::ExtractedView; use bevy_render::RenderSystems::PrepareAssets; +use bevy_tasks::ComputeTaskPool; use bytemuck::{Pod, Zeroable}; use nonmax::{NonMaxU16, NonMaxU32}; @@ -844,7 +845,7 @@ pub struct MeshesToReextractNextFrame(MainEntityHashSet); impl RenderMeshInstanceShared { /// A gpu builder will provide the mesh instance id - /// during [`RenderMeshInstanceGpuBuilder::update`]. + /// during [`RenderMeshInstanceGpuPrepared::update`]. fn for_gpu_building( previous_transform: Option<&PreviousGlobalTransform>, mesh: &Mesh3d, @@ -866,7 +867,7 @@ impl RenderMeshInstanceShared { ) } - /// The cpu builder does not have an equivalent [`RenderMeshInstanceGpuBuilder::update`]. + /// The cpu builder does not have an equivalent [`RenderMeshInstanceGpuBuilder`]. fn for_cpu_building( previous_transform: Option<&PreviousGlobalTransform>, mesh: &Mesh3d, @@ -1131,22 +1132,33 @@ impl RenderMeshInstanceGpuQueue { } impl RenderMeshInstanceGpuBuilder { - /// Flushes this mesh instance to the [`RenderMeshInstanceGpu`] and - /// [`MeshInputUniform`] tables, replacing the existing entry if applicable. - fn update( + /// Prepares the data needed to update the mesh instance. + /// + /// This is the thread-safe part of the update. + fn prepare( mut self, entity: MainEntity, - render_mesh_instances: &mut MainEntityHashMap, - current_input_buffer: &mut InstanceInputUniformBuffer, - previous_input_buffer: &mut InstanceInputUniformBuffer, mesh_allocator: &MeshAllocator, mesh_material_ids: &RenderMaterialInstances, render_material_bindings: &RenderMaterialBindings, render_lightmaps: &RenderLightmaps, skin_uniforms: &SkinUniforms, timestamp: FrameCount, - meshes_to_reextract_next_frame: &mut MeshesToReextractNextFrame, - ) -> Option { + ) -> Option { + // Look up the material index. If we couldn't fetch the material index, + // then the material hasn't been prepared yet, perhaps because it hasn't + // yet loaded. In that case, we return None so that + // `collect_meshes_for_gpu_building` will add the mesh to + // `meshes_to_reextract_next_frame` and bail. + let mesh_material = mesh_material_ids.mesh_material(entity); + let mesh_material_binding_id = if mesh_material != DUMMY_MESH_MATERIAL.untyped() { + render_material_bindings.get(&mesh_material).copied()? + } else { + // Use a dummy material binding ID. + MaterialBindingId::default() + }; + self.shared.material_bindings_index = mesh_material_binding_id; + let (first_vertex_index, vertex_count) = match mesh_allocator.mesh_vertex_slice(&self.shared.mesh_asset_id) { Some(mesh_vertex_slice) => ( @@ -1169,25 +1181,6 @@ impl RenderMeshInstanceGpuBuilder { None => u32::MAX, }; - // Look up the material index. If we couldn't fetch the material index, - // then the material hasn't been prepared yet, perhaps because it hasn't - // yet loaded. In that case, add the mesh to - // `meshes_to_reextract_next_frame` and bail. - let mesh_material = mesh_material_ids.mesh_material(entity); - let mesh_material_binding_id = if mesh_material != DUMMY_MESH_MATERIAL.untyped() { - match render_material_bindings.get(&mesh_material) { - Some(binding_id) => *binding_id, - None => { - meshes_to_reextract_next_frame.insert(entity); - return None; - } - } - } else { - // Use a dummy material binding ID. - MaterialBindingId::default() - }; - self.shared.material_bindings_index = mesh_material_binding_id; - let lightmap_slot = match render_lightmaps.render_lightmaps.get(&entity) { Some(render_lightmap) => u16::from(*render_lightmap.slot_index), None => u16::MAX, @@ -1199,7 +1192,7 @@ impl RenderMeshInstanceGpuBuilder { self.shared.lightmap_slab_index = lightmap_slab_index; // Create the mesh input uniform. - let mut mesh_input_uniform = MeshInputUniform { + let mesh_input_uniform = MeshInputUniform { world_from_local: self.world_from_local.to_transpose(), lightmap_uv_rect: self.lightmap_uv_rect, flags: self.mesh_flags.bits(), @@ -1220,12 +1213,39 @@ impl RenderMeshInstanceGpuBuilder { pad: 0, }; - // Did the last frame contain this entity as well? - let current_uniform_index; let world_from_local = &self.world_from_local; let center = world_from_local.matrix3.mul_vec3(self.shared.center) + world_from_local.translation; + Some(RenderMeshInstanceGpuPrepared { + shared: self.shared, + mesh_input_uniform, + center, + }) + } +} + +pub struct RenderMeshInstanceGpuPrepared { + /// Data shared between the CPU and GPU versions of this mesh instance. + shared: RenderMeshInstanceShared, + /// The data that will be uploaded to the GPU as a [`MeshInputUniform`]. + mesh_input_uniform: MeshInputUniform, + /// The world-space center of the mesh instance, used for culling and sorting. + center: Vec3, +} + +impl RenderMeshInstanceGpuPrepared { + /// Flushes this mesh instance to the [`RenderMeshInstanceGpu`] and + /// [`MeshInputUniform`] tables, replacing the existing entry if applicable. + fn update( + mut self, + entity: MainEntity, + render_mesh_instances: &mut MainEntityHashMap, + current_input_buffer: &mut InstanceInputUniformBuffer, + previous_input_buffer: &mut InstanceInputUniformBuffer, + ) -> Option { + // Did the last frame contain this entity as well? + let current_uniform_index; match render_mesh_instances.entry(entity) { Entry::Occupied(mut occupied_entry) => { // Yes, it did. Replace its entry with the new one. @@ -1238,15 +1258,15 @@ impl RenderMeshInstanceGpuBuilder { let previous_mesh_input_uniform = current_input_buffer.get_unchecked(current_uniform_index); let previous_input_index = previous_input_buffer.add(previous_mesh_input_uniform); - mesh_input_uniform.previous_input_index = previous_input_index; + self.mesh_input_uniform.previous_input_index = previous_input_index; // Write in the new mesh input uniform. - current_input_buffer.set(current_uniform_index, mesh_input_uniform); + current_input_buffer.set(current_uniform_index, self.mesh_input_uniform); occupied_entry.replace_entry_with(|_, _| { Some(RenderMeshInstanceGpu { shared: self.shared, - center, + center: self.center, current_uniform_index: NonMaxU32::new(current_uniform_index) .unwrap_or_default(), }) @@ -1255,11 +1275,11 @@ impl RenderMeshInstanceGpuBuilder { Entry::Vacant(vacant_entry) => { // No, this is a new entity. Push its data on to the buffer. - current_uniform_index = current_input_buffer.add(mesh_input_uniform); + current_uniform_index = current_input_buffer.add(self.mesh_input_uniform); vacant_entry.insert(RenderMeshInstanceGpu { shared: self.shared, - center, + center: self.center, current_uniform_index: NonMaxU32::new(current_uniform_index) .unwrap_or_default(), }); @@ -1741,8 +1761,18 @@ pub fn set_mesh_motion_vector_flags( } } +#[derive(Default)] +pub struct GpuMeshBuildingChunks { + prepared: BufferedChannel<( + MainEntity, + RenderMeshInstanceGpuPrepared, + Option, + )>, + reextract: BufferedChannel, + removed: BufferedChannel, +} + /// Creates the [`RenderMeshInstanceGpu`]s and [`MeshInputUniform`]s when GPU -/// mesh uniforms are built. pub fn collect_meshes_for_gpu_building( render_mesh_instances: ResMut, batched_instance_buffers: ResMut< @@ -1757,6 +1787,7 @@ pub fn collect_meshes_for_gpu_building( skin_uniforms: Res, frame_count: Res, mut meshes_to_reextract_next_frame: ResMut, + chunks: Local, ) { let RenderMeshInstances::GpuBuilding(render_mesh_instances) = render_mesh_instances.into_inner() @@ -1773,83 +1804,157 @@ pub fn collect_meshes_for_gpu_building( previous_input_buffer, .. } = batched_instance_buffers.into_inner(); - previous_input_buffer.clear(); - // Build the [`RenderMeshInstance`]s and [`MeshInputUniform`]s. - - for queue in render_mesh_instance_queues.iter_mut() { - match *queue { - RenderMeshInstanceGpuQueue::None => { - // This can only happen if the queue is empty. + // Channels used by parallel workers to send data to the single consumer. + let (prepared_rx, prepared_tx) = chunks.prepared.unbounded(); + let (reextract_rx, reextract_tx) = chunks.reextract.unbounded(); + let (removed_rx, removed_tx) = chunks.removed.unbounded(); + + // Reference data shared between tasks + let mesh_allocator = &mesh_allocator; + let mesh_material_ids = &mesh_material_ids; + let render_material_bindings = &render_material_bindings; + let render_lightmaps = &render_lightmaps; + let skin_uniforms = &skin_uniforms; + let frame_count = *frame_count; + + // Spawn workers on the taskpool to prepare and update meshes in parallel. + ComputeTaskPool::get().scope(|scope| { + // This worker is the bottleneck of mesh preparation and can only run serially, so we want + // it to start working immediately. As soon as the parallel workers produce chunks of + // prepared meshes, this worker will consume them and update the GPU buffers. + scope.spawn( + async move { + while let Ok(mut batch) = prepared_rx.recv().await { + for (entity, prepared, mesh_culling_builder) in batch.drain() { + let Some(instance_data_index) = prepared.update( + entity, + &mut *render_mesh_instances, + current_input_buffer, + previous_input_buffer, + ) else { + continue; + }; + if let Some(mesh_culling_data) = mesh_culling_builder { + mesh_culling_data.update( + &mut mesh_culling_data_buffer, + instance_data_index as usize, + ); + } + } + } + while let Ok(mut batch) = removed_rx.recv().await { + for entity in batch.drain() { + remove_mesh_input_uniform( + entity, + &mut *render_mesh_instances, + current_input_buffer, + ); + } + } + while let Ok(mut batch) = reextract_rx.recv().await { + for entity in batch.drain() { + meshes_to_reextract_next_frame.insert(entity); + } + } + // Buffers can't be empty. Make sure there's something in the previous input buffer. + previous_input_buffer.ensure_nonempty(); } + .instrument(info_span!("collect_meshes_consumer")), + ); - RenderMeshInstanceGpuQueue::CpuCulling { - ref mut changed, - ref mut removed, - } => { - for (entity, mesh_instance_builder) in changed.drain(..) { - mesh_instance_builder.update( - entity, - &mut *render_mesh_instances, - current_input_buffer, - previous_input_buffer, - &mesh_allocator, - &mesh_material_ids, - &render_material_bindings, - &render_lightmaps, - &skin_uniforms, - *frame_count, - &mut meshes_to_reextract_next_frame, - ); + // Iterate through each queue, spawning a task for each queue. This loop completes quickly + // as it does very little work, it is just spawning and moving data into tasks in a loop. + for queue in render_mesh_instance_queues.iter_mut() { + match *queue { + RenderMeshInstanceGpuQueue::None => { + // This can only happen if the queue is empty. } - for entity in removed.drain(..) { - remove_mesh_input_uniform( - entity, - &mut *render_mesh_instances, - current_input_buffer, - ); - } - } + RenderMeshInstanceGpuQueue::CpuCulling { + ref mut changed, + ref mut removed, + } => { + let mut prepared_tx = prepared_tx.clone(); + let mut reextract_tx = reextract_tx.clone(); + let mut removed_tx = removed_tx.clone(); + scope.spawn(async move { + let _span = info_span!("prepared_mesh_producer").entered(); + changed + .drain(..) + .for_each( + |(entity, mesh_instance_builder)| match mesh_instance_builder + .prepare( + entity, + mesh_allocator, + mesh_material_ids, + render_material_bindings, + render_lightmaps, + skin_uniforms, + frame_count, + ) { + Some(prepared) => { + prepared_tx.send_blocking((entity, prepared, None)).ok(); + } + None => { + reextract_tx.send_blocking(entity).ok(); + } + }, + ); - RenderMeshInstanceGpuQueue::GpuCulling { - ref mut changed, - ref mut removed, - } => { - for (entity, mesh_instance_builder, mesh_culling_builder) in changed.drain(..) { - let Some(instance_data_index) = mesh_instance_builder.update( - entity, - &mut *render_mesh_instances, - current_input_buffer, - previous_input_buffer, - &mesh_allocator, - &mesh_material_ids, - &render_material_bindings, - &render_lightmaps, - &skin_uniforms, - *frame_count, - &mut meshes_to_reextract_next_frame, - ) else { - continue; - }; - mesh_culling_builder - .update(&mut mesh_culling_data_buffer, instance_data_index as usize); + for entity in removed.drain(..) { + removed_tx.send_blocking(entity).unwrap(); + } + }); } - for entity in removed.drain(..) { - remove_mesh_input_uniform( - entity, - &mut *render_mesh_instances, - current_input_buffer, - ); + RenderMeshInstanceGpuQueue::GpuCulling { + ref mut changed, + ref mut removed, + } => { + let mut prepared_tx = prepared_tx.clone(); + let mut reextract_tx = reextract_tx.clone(); + let mut removed_tx = removed_tx.clone(); + scope.spawn(async move { + let _span = info_span!("prepared_mesh_producer").entered(); + changed.drain(..).for_each( + |(entity, mesh_instance_builder, mesh_culling_builder)| { + match mesh_instance_builder.prepare( + entity, + mesh_allocator, + mesh_material_ids, + render_material_bindings, + render_lightmaps, + skin_uniforms, + frame_count, + ) { + Some(prepared) => { + let data = (entity, prepared, Some(mesh_culling_builder)); + prepared_tx.send_blocking(data).ok(); + } + None => { + reextract_tx.send_blocking(entity).ok(); + } + } + }, + ); + + for entity in removed.drain(..) { + removed_tx.send_blocking(entity).unwrap(); + } + }); } } } - } - // Buffers can't be empty. Make sure there's something in the previous input buffer. - previous_input_buffer.ensure_nonempty(); + // Drop the senders owned by the scope, so the only senders left are those captured by the + // spawned tasks. When the tasks are complete, the channels will close, and the consumer + // will finish. Without this, the scope would deadlock on the blocked consumer. + drop(prepared_tx); + drop(reextract_tx); + drop(removed_tx); + }); } /// All data needed to construct a pipeline for rendering 3D meshes. diff --git a/crates/bevy_platform/Cargo.toml b/crates/bevy_platform/Cargo.toml index 5d2df33d50b66..6a385e64f7496 100644 --- a/crates/bevy_platform/Cargo.toml +++ b/crates/bevy_platform/Cargo.toml @@ -21,6 +21,12 @@ rayon = ["dep:rayon", "hashbrown/rayon"] # Platform Compatibility +## Provides an implementation of `block_on` from `futures-lite`. +futures-lite = ["std", "dep:futures-lite", "futures-lite?/std"] + +## Provides an implementation of `block_on` from `async-io`. +async-io = ["std", "dep:async-io"] + ## Allows access to the `std` crate. Enabling this feature will prevent compilation ## on `no_std` targets, but provides access to certain additional features on ## supported platforms. @@ -72,6 +78,8 @@ hashbrown = { version = "0.16.1", features = [ ], optional = true, default-features = false } serde = { version = "1", default-features = false, optional = true } rayon = { version = "1", default-features = false, optional = true } +futures-lite = { version = "2.0.1", default-features = false, optional = true } +async-io = { version = "2.0.0", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] web-time = { version = "1.1", default-features = false, optional = true } diff --git a/crates/bevy_platform/src/future.rs b/crates/bevy_platform/src/future.rs new file mode 100644 index 0000000000000..e733183752ae9 --- /dev/null +++ b/crates/bevy_platform/src/future.rs @@ -0,0 +1,32 @@ +//! Platform-aware future utilities. + +crate::cfg::switch! { + #[cfg(feature = "async-io")] => { + pub use async_io::block_on; + } + #[cfg(feature = "futures-lite")] => { + pub use futures_lite::future::block_on; + } + _ => { + /// Blocks on the supplied `future`. + /// This implementation will busy-wait until it is completed. + /// Consider enabling the `async-io` or `futures-lite` features. + pub fn block_on(future: impl Future) -> T { + use core::task::{Poll, Context}; + + // Pin the future on the stack. + let mut future = core::pin::pin!(future); + + // We don't care about the waker as we're just going to poll as fast as possible. + let cx = &mut Context::from_waker(core::task::Waker::noop()); + + // Keep polling until the future is ready. + loop { + match future.as_mut().poll(cx) { + Poll::Ready(output) => return output, + Poll::Pending => core::hint::spin_loop(), + } + } + } + } +} diff --git a/crates/bevy_platform/src/lib.rs b/crates/bevy_platform/src/lib.rs index 4c0b4e4621163..b4ff694f1187f 100644 --- a/crates/bevy_platform/src/lib.rs +++ b/crates/bevy_platform/src/lib.rs @@ -21,6 +21,7 @@ cfg::alloc! { pub mod cell; pub mod cfg; +pub mod future; pub mod hash; pub mod sync; pub mod thread; diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 25ee59662d918..e5fabf05a00a5 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -25,11 +25,11 @@ multi_threaded = [ async_executor = ["bevy_platform/std", "dep:async-executor", "futures-lite"] # Provide an implementation of `block_on` from `futures-lite`. -futures-lite = ["bevy_platform/std", "futures-lite/std"] +futures-lite = ["bevy_platform/futures-lite"] # Use async-io's implementation of block_on instead of futures-lite's implementation. # This is preferred if your application uses async-io. -async-io = ["bevy_platform/std", "dep:async-io"] +async-io = ["bevy_platform/async-io"] [dependencies] bevy_platform = { path = "../bevy_platform", version = "0.19.0-dev", default-features = false, features = [ @@ -46,7 +46,6 @@ derive_more = { version = "2", default-features = false, features = [ ] } async-executor = { version = "1.11", optional = true } async-channel = { version = "2.3.0", optional = true } -async-io = { version = "2.0.0", optional = true } concurrent-queue = { version = "2.0.0", optional = true } atomic-waker = { version = "1", default-features = false } crossbeam-queue = { version = "0.3", default-features = false, features = [ diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 4ef156170a87e..da1a1a0538017 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -28,15 +28,6 @@ pub mod cfg { conditional_send } - #[cfg(feature = "async-io")] => { - /// Indicates `async-io` will be used for the implementation of `block_on`. - async_io - } - - #[cfg(feature = "futures-lite")] => { - /// Indicates `futures-lite` will be used for the implementation of `block_on`. - futures_lite - } } } @@ -114,36 +105,7 @@ cfg::multi_threaded! { } } -cfg::switch! { - cfg::async_io => { - pub use async_io::block_on; - } - cfg::futures_lite => { - pub use futures_lite::future::block_on; - } - _ => { - /// Blocks on the supplied `future`. - /// This implementation will busy-wait until it is completed. - /// Consider enabling the `async-io` or `futures-lite` features. - pub fn block_on(future: impl Future) -> T { - use core::task::{Poll, Context}; - - // Pin the future on the stack. - let mut future = core::pin::pin!(future); - - // We don't care about the waker as we're just going to poll as fast as possible. - let cx = &mut Context::from_waker(core::task::Waker::noop()); - - // Keep polling until the future is ready. - loop { - match future.as_mut().poll(cx) { - Poll::Ready(output) => return output, - Poll::Pending => core::hint::spin_loop(), - } - } - } - } -} +pub use bevy_platform::future::block_on; /// The tasks prelude. /// diff --git a/crates/bevy_utils/Cargo.toml b/crates/bevy_utils/Cargo.toml index 325dfb4b0d871..96d726009c10a 100644 --- a/crates/bevy_utils/Cargo.toml +++ b/crates/bevy_utils/Cargo.toml @@ -9,11 +9,13 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [features] -default = ["parallel"] +default = ["parallel", "buffered_channel"] # Provides access to the `Parallel` type. parallel = ["bevy_platform/std", "dep:thread_local"] +buffered_channel = ["bevy_platform/std", "dep:async-channel"] + std = ["disqualified/alloc"] debug = ["bevy_platform/alloc"] @@ -23,9 +25,13 @@ bevy_platform = { path = "../bevy_platform", version = "0.19.0-dev", default-fea disqualified = { version = "1.0", default-features = false } thread_local = { version = "1.0", optional = true } +async-channel = { version = "2.3.0", optional = true } [dev-dependencies] static_assertions = "1.1.0" +bevy_ecs = { path = "../bevy_ecs", version = "0.19.0-dev", default-features = false } +bevy_app = { path = "../bevy_app", version = "0.19.0-dev", default-features = false } +bevy_tasks = { path = "../bevy_tasks", version = "0.19.0-dev", default-features = false } [lints] workspace = true diff --git a/crates/bevy_utils/src/buffered_channel.rs b/crates/bevy_utils/src/buffered_channel.rs new file mode 100644 index 0000000000000..e717380b7ddfa --- /dev/null +++ b/crates/bevy_utils/src/buffered_channel.rs @@ -0,0 +1,288 @@ +use crate::Parallel; +use alloc::vec::Vec; +use async_channel::{Receiver, Sender}; +use core::ops::{Deref, DerefMut}; + +/// An asynchronous MPSC channel that buffers messages and reuses allocations with thread locals. +/// +/// This is a building block for efficient parallel worker tasks. +/// +/// Cache this channel in a system's `Local` to reuse allocated memory. +/// +/// This is faster than sending each message individually into a channel when communicating between +/// tasks. Unlike `Parallel`, this allows you to execute a consuming task while producing tasks are +/// concurrently sending data into the channel, enabling you to run a serial processing consumer +/// at the same time as many parallel processing producers. +/// +/// # Usage +/// +/// ``` +/// use bevy_utils::BufferedChannel; +/// use bevy_app::{App, TaskPoolPlugin, Update}; +/// use bevy_ecs::system::Local; +/// use bevy_tasks::ComputeTaskPool; +/// +/// App::new() +/// .add_plugins(TaskPoolPlugin::default()) +/// .add_systems(Update, parallel_system) +/// .update(); +/// +/// fn parallel_system(channel: Local>) { +/// let (rx, tx) = channel.unbounded(); +/// ComputeTaskPool::get().scope(|scope| { +/// // Spawn a single consumer task that reads from the producers. Note we can spawn this +/// // first and have it immediately start processing the messages produced in parallel. +/// // Because we are receiving asynchronously, we avoid deadlocks even on a single thread. +/// scope.spawn(async move { +/// let mut total = 0; +/// let mut count = 0; +/// while let Ok(mut chunk) = rx.recv().await { +/// count += chunk.len(); +/// total += chunk.iter().sum::(); +/// } +/// assert_eq!(count, 500_000); +/// assert_eq!(total, 24_999_750_000); +/// }); +/// +/// // Spawn a few producing tasks in parallel that send data into the buffered channel. +/// for _ in 0..5 { +/// let mut tx = tx.clone(); +/// scope.spawn(async move { +/// // Because this is buffered, we can iterate over hundreds of thousands of +/// // entities in each task while avoiding allocation and channel overhead. +/// // The buffer is flushed periodically, sending chunks of data to the receiver. +/// for i in 0..100_000 { +/// tx.send(i).await; +/// } +/// }); +/// } +/// +/// // Drop the unused sender so the channel can close. +/// drop(tx); +/// }); +/// } +/// ``` +pub struct BufferedChannel { + /// The minimum length of a `Vec` of buffered data before it is sent through the channel. + pub chunk_size: usize, + /// A pool of reusable vectors to minimize allocations. + pool: Parallel>>, +} + +impl Default for BufferedChannel { + fn default() -> Self { + Self { + // This was tuned based on benchmarks across a wide range of sizes. + chunk_size: 1024, + pool: Parallel::default(), + } + } +} + +/// A wrapper around a [`Receiver`] that returns [`RecycledVec`]s to automatically return +/// buffers to the [`BufferedChannel`] pool. +pub struct BufferedReceiver<'a, T: Send> { + channel: &'a BufferedChannel, + rx: Receiver>, +} + +impl<'a, T: Send> BufferedReceiver<'a, T> { + /// Receive a message asynchronously. + /// + /// The returned [`RecycledVec`] will automatically return the buffer to the pool when dropped. + pub async fn recv(&self) -> Result, async_channel::RecvError> { + let buffer = self.rx.recv().await?; + Ok(RecycledVec { + buffer: Some(buffer), + channel: self.channel, + }) + } + + /// Receive a message blocking. + /// + /// The returned [`RecycledVec`] will automatically return the buffer to the pool when dropped. + pub fn recv_blocking(&self) -> Result, async_channel::RecvError> { + #[cfg(all(feature = "std", not(target_family = "wasm")))] + let buffer = self.rx.recv_blocking()?; + #[cfg(any(not(feature = "std"), target_family = "wasm"))] + let buffer = bevy_platform::future::block_on(self.rx.recv())?; + + Ok(RecycledVec { + buffer: Some(buffer), + channel: self.channel, + }) + } +} + +/// A wrapper around a `Vec` that automatically returns it to the [`BufferedChannel`]'s pool when +/// dropped. +pub struct RecycledVec<'a, T: Send> { + buffer: Option>, + channel: &'a BufferedChannel, +} + +impl<'a, T: Send> RecycledVec<'a, T> { + /// Drains the elements from the buffer as an iterator, keeping the allocation + /// so it can be recycled when this [`RecycledVec`] is dropped. + pub fn drain(&mut self) -> alloc::vec::Drain<'_, T> { + self.buffer.as_mut().unwrap().drain(..) + } +} + +impl<'a, T: Send> Deref for RecycledVec<'a, T> { + type Target = [T]; + fn deref(&self) -> &Self::Target { + self.buffer.as_ref().unwrap() + } +} + +impl<'a, T: Send> DerefMut for RecycledVec<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.buffer.as_mut().unwrap() + } +} + +impl<'a, 'b, T: Send> IntoIterator for &'b RecycledVec<'a, T> { + type Item = &'b T; + type IntoIter = core::slice::Iter<'b, T>; + + fn into_iter(self) -> Self::IntoIter { + self.buffer.as_ref().unwrap().iter() + } +} + +impl<'a, 'b, T: Send> IntoIterator for &'b mut RecycledVec<'a, T> { + type Item = &'b mut T; + type IntoIter = core::slice::IterMut<'b, T>; + + fn into_iter(self) -> Self::IntoIter { + self.buffer.as_mut().unwrap().iter_mut() + } +} + +impl<'a, T: Send> Drop for RecycledVec<'a, T> { + fn drop(&mut self) { + if let Some(mut buffer) = self.buffer.take() { + buffer.clear(); + self.channel.pool.borrow_local_mut().push(buffer); + } + } +} + +/// A [`BufferedChannel`] sender that buffers messages locally, flushing it when the sender is +/// dropped or [`BufferedChannel::chunk_size`] is reached. +pub struct BufferedSender<'a, T: Send> { + channel: &'a BufferedChannel, + /// We use an `Option` to lazily allocate the buffer or pull from the channel's buffer pool. + buffer: Option>, + tx: Sender>, +} + +impl BufferedChannel { + fn get_buffer(&self) -> Vec { + self.pool + .borrow_local_mut() + .pop() + .unwrap_or_else(|| Vec::with_capacity(self.chunk_size)) + } + + /// Create an unbounded channel and return the receiver and sender. + /// + /// The created channel can hold an unlimited number of messages. + pub fn unbounded(&self) -> (BufferedReceiver<'_, T>, BufferedSender<'_, T>) { + let (tx, rx) = async_channel::unbounded(); + ( + BufferedReceiver { channel: self, rx }, + BufferedSender { + channel: self, + buffer: None, + tx, + }, + ) + } + + /// Create a bounded channel and return the receiver and sender. + /// + /// The created channel has space to hold at most `cap` messages at a time. + /// + /// # Panics + /// + /// Capacity must be a positive number. If `cap` is zero, this function will panic. + pub fn bounded(&self, cap: usize) -> (BufferedReceiver<'_, T>, BufferedSender<'_, T>) { + let (tx, rx) = async_channel::bounded(cap); + ( + BufferedReceiver { channel: self, rx }, + BufferedSender { + channel: self, + buffer: None, + tx, + }, + ) + } +} + +impl<'a, T: Send> BufferedSender<'a, T> { + /// Send a message asynchronously. + /// + /// This is buffered and will not be sent into the channel until [`BufferedChannel::chunk_size`] + /// messages are accumulated or the sender is dropped. + pub async fn send(&mut self, msg: T) -> Result<(), async_channel::SendError>> { + let buffer = self.buffer.get_or_insert_with(|| self.channel.get_buffer()); + buffer.push(msg); + if buffer.len() >= self.channel.chunk_size { + let full_buffer = self.buffer.take().unwrap(); + self.tx.send(full_buffer).await?; + } + Ok(()) + } + + /// Send an item blocking. + /// + /// This is buffered and will not be sent into the channel until [`BufferedChannel::chunk_size`] + /// messages are accumulated or the sender is dropped. + pub fn send_blocking(&mut self, msg: T) -> Result<(), async_channel::SendError>> { + let buffer = self.buffer.get_or_insert_with(|| self.channel.get_buffer()); + buffer.push(msg); + if buffer.len() >= self.channel.chunk_size { + let full_buffer = self.buffer.take().unwrap(); + #[cfg(all(feature = "std", not(target_family = "wasm")))] + self.tx.send_blocking(full_buffer)?; + #[cfg(any(not(feature = "std"), target_family = "wasm"))] + bevy_platform::future::block_on(self.tx.send(full_buffer))?; + } + Ok(()) + } + + /// Flush any remaining messages in the local buffer, sending them into the channel. + fn flush(&mut self) { + if let Some(buffer) = self.buffer.take() { + if !buffer.is_empty() { + // The allocation is sent through the channel and will be reused when dropped. + #[cfg(all(feature = "std", not(target_family = "wasm")))] + let _ = self.tx.send_blocking(buffer); + #[cfg(any(not(feature = "std"), target_family = "wasm"))] + let _ = bevy_platform::future::block_on(self.tx.send(buffer)); + } else { + // If it's empty, just return it to the pool. + self.channel.pool.borrow_local_mut().push(buffer); + } + } + } +} + +impl<'a, T: Send> Clone for BufferedSender<'a, T> { + fn clone(&self) -> Self { + Self { + channel: self.channel, + buffer: None, + tx: self.tx.clone(), + } + } +} + +/// Automatically flush the buffer when a sender is dropped. +impl<'a, T: Send> Drop for BufferedSender<'a, T> { + fn drop(&mut self) { + self.flush(); + } +} diff --git a/crates/bevy_utils/src/lib.rs b/crates/bevy_utils/src/lib.rs index 9055b825f1f31..2569d4926a821 100644 --- a/crates/bevy_utils/src/lib.rs +++ b/crates/bevy_utils/src/lib.rs @@ -20,6 +20,10 @@ pub mod cfg { /// Indicates the `Parallel` type is available. parallel } + #[cfg(feature = "buffered_channel")] => { + /// Indicates the `BufferedChannel` type is available. + buffered_channel + } } } @@ -39,6 +43,11 @@ cfg::parallel! { pub use parallel_queue::*; } +cfg::buffered_channel! { + mod buffered_channel; + pub use buffered_channel::*; +} + /// The utilities prelude. /// /// This includes the most common types in this crate, re-exported for your convenience.