diff --git a/Cargo.lock b/Cargo.lock index 80fac49..e5ed8f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -31,9 +31,9 @@ checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" [[package]] name = "bytemuck" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94bbb0ad554ad961ddc5da507a12a29b14e4ae5bda06b19f575a3e6079d2e2ae" +checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d" [[package]] name = "byteorder" @@ -136,11 +136,17 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "fixed-capacity-vec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b31a14f5ee08ed1a40e1252b35af18bed062e3f39b69aab34decde36bc43e40" + [[package]] name = "flate2" -version = "1.0.34" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0" +checksum = "c936bfdafb507ebbf50b8074c54fa31c5be9a1e7e5f467dd659697041407d07c" dependencies = [ "crc32fast", "miniz_oxide", @@ -202,7 +208,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -269,7 +275,7 @@ checksum = "53010ccb100b96a67bc32c0175f0ed1426b31b655d562898e57325f81c023ac0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -280,9 +286,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "hashbrown" -version = "0.15.0" +version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3" [[package]] name = "indexmap" @@ -314,6 +320,7 @@ dependencies = [ "byteorder", "cpu-time", "default-boxed", + "fixed-capacity-vec", "flate2", "git-version", "log", @@ -330,9 +337,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.159" +version = "0.2.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5" +checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" [[package]] name = "log" @@ -372,9 +379,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" +checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" [[package]] name = "pin-utils" @@ -408,9 +415,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.87" +version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3e4daa0dcf6feba26f985457cdf104d4b4256fc5a09547140f3631bb076b19a" +checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" dependencies = [ "unicode-ident", ] @@ -466,9 +473,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", @@ -478,9 +485,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", @@ -525,7 +532,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.79", + "syn 2.0.87", "unicode-ident", ] @@ -540,9 +547,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" +checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" [[package]] name = "safe_arch" @@ -561,22 +568,22 @@ checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" [[package]] name = "serde" -version = "1.0.210" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a" +checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.210" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" +checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -619,9 +626,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.79" +version = "2.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" +checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" dependencies = [ "proc-macro2", "quote", @@ -630,9 +637,9 @@ dependencies = [ [[package]] name = "thread-priority" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d3b04d33c9633b8662b167b847c7ab521f83d1ae20f2321b65b5b925e532e36" +checksum = "cfe075d7053dae61ac5413a34ea7d4913b6e6207844fd726bdd858b37ff72bf5" dependencies = [ "bitflags", "cfg-if", @@ -839,5 +846,5 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] diff --git a/Cargo.toml b/Cargo.toml index 5b332e6..78bbb4e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ simple_logger ="5.0" unroll = "0.1" rayon-core = { version = "1", optional = true } git-version = "0.3" +fixed-capacity-vec = "1.0.1" [target.'cfg(windows)'.dependencies] cpu-time = "1.0" diff --git a/src/lepton_error.rs b/src/lepton_error.rs index 5d38f7a..d9e2843 100644 --- a/src/lepton_error.rs +++ b/src/lepton_error.rs @@ -126,6 +126,7 @@ pub trait AddContext { impl> AddContext for core::result::Result { #[track_caller] + #[inline(always)] fn context(self) -> Result { match self { Ok(x) => Ok(x), diff --git a/src/structs/lepton_encoder.rs b/src/structs/lepton_encoder.rs index abdaa3b..83845e4 100644 --- a/src/structs/lepton_encoder.rs +++ b/src/structs/lepton_encoder.rs @@ -117,8 +117,6 @@ pub fn lepton_encode_row_range( features, ) .context()?; - - bool_writer.flush_non_final_data().context()?; } if is_last_thread && full_file_compression { diff --git a/src/structs/multiplexer.rs b/src/structs/multiplexer.rs index ab3ff4e..23d4b71 100644 --- a/src/structs/multiplexer.rs +++ b/src/structs/multiplexer.rs @@ -1,4 +1,3 @@ -use std::cmp; use std::collections::VecDeque; use std::io::{Cursor, Read, Write}; use std::mem::swap; @@ -6,6 +5,7 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; use byteorder::WriteBytesExt; +use fixed_capacity_vec::FixedCapacityVec; use crate::lepton_error::{AddContext, ExitCode, LeptonError, Result}; /// Implements a multiplexer that reads and writes blocks to a stream from multiple threads. @@ -22,48 +22,54 @@ enum Message { WriteBlock(usize, Vec), } +const WRITE_BUFFER_SIZE: usize = 65536; + pub struct MultiplexWriter { thread_id: usize, sender: Sender, - buffer: Vec, + buffer: FixedCapacityVec, } -const WRITE_BUFFER_SIZE: usize = 65536; - impl Write for MultiplexWriter { + #[inline(always)] fn write(&mut self, buf: &[u8]) -> std::io::Result { - let mut copy_start = 0; - while copy_start < buf.len() { - let amount_to_copy = cmp::min( - WRITE_BUFFER_SIZE - self.buffer.len(), - buf.len() - copy_start, - ); - self.buffer - .extend_from_slice(&buf[copy_start..copy_start + amount_to_copy]); - - if self.buffer.len() == WRITE_BUFFER_SIZE { - self.flush()?; + // the caller writes one byte at time so there's no need to optimize here + for &b in buf { + if self.buffer.try_push_or_discard(b).is_err() { + // send buffer if we are full and include this byte in it + self.send_buffer::(b); } - - copy_start += amount_to_copy; } - Ok(buf.len()) } fn flush(&mut self) -> std::io::Result<()> { if self.buffer.len() > 0 { - let mut new_buffer = Vec::with_capacity(WRITE_BUFFER_SIZE); - swap(&mut new_buffer, &mut self.buffer); - - self.sender - .send(Message::WriteBlock(self.thread_id, new_buffer)) - .unwrap(); + self.send_buffer::(0); } Ok(()) } } +impl MultiplexWriter { + #[cold] + fn send_buffer(&mut self, next_byte: u8) { + let mut new_buffer = FixedCapacityVec::new(); + if PUSH_NEXT { + new_buffer.push(next_byte); + } + + swap(&mut new_buffer, &mut self.buffer); + + // ignore errors here since there's really no need to interrupt our work + // since errors are not expected and this will slow down the processing + // by requiring the callers up the chain to check for errors + let _ = self + .sender + .send(Message::WriteBlock(self.thread_id, new_buffer.into())); + } +} + // if we are using Rayon, these are the primatives to use to spawn thread pool work items #[cfg(feature = "use_rayon")] fn my_scope<'scope, OP, R>(op: OP) -> R @@ -145,7 +151,7 @@ where let mut thread_writer = MultiplexWriter { thread_id: thread_id, sender: cloned_sender, - buffer: Vec::with_capacity(WRITE_BUFFER_SIZE), + buffer: FixedCapacityVec::new(), }; let processor_clone = arc_processor.clone(); diff --git a/src/structs/vpx_bool_writer.rs b/src/structs/vpx_bool_writer.rs index 49a6e9a..d8b3c4e 100644 --- a/src/structs/vpx_bool_writer.rs +++ b/src/structs/vpx_bool_writer.rs @@ -33,7 +33,8 @@ pub struct VPXBoolWriter { range: u32, count: i32, writer: W, - buffer: Vec, + num_buffered_bytes: u32, + buffered_byte: u8, model_statistics: Metrics, #[allow(dead_code)] pub hash: SimpleHash, @@ -45,8 +46,9 @@ impl VPXBoolWriter { low_value: 0, range: 255, count: -24, - buffer: Vec::new(), writer: writer, + num_buffered_bytes: 0, + buffered_byte: 0, model_statistics: Metrics::default(), hash: SimpleHash::new(), }; @@ -118,26 +120,7 @@ impl VPXBoolWriter { *tmp_count += shift; if *tmp_count >= 0 { - let offset = shift - *tmp_count; - - if ((*tmp_value << (offset - 1)) & 0x80000000) != 0 { - let mut x = self.buffer.len() - 1; - - while self.buffer[x] == 0xFF { - self.buffer[x] = 0; - - assert!(x > 0); - x -= 1; - } - - self.buffer[x] += 1; - } - - self.buffer.push((*tmp_value >> (24 - offset)) as u8); - *tmp_value <<= offset; - shift = *tmp_count; - *tmp_value &= 0xffffff; - *tmp_count -= 8; + self.send_to_output(&mut shift, tmp_count, tmp_value)?; } *tmp_value <<= shift; @@ -291,26 +274,59 @@ impl VPXBoolWriter { self.put_bit(false, &mut dummy_branch, ModelComponent::Dummy)?; } - self.writer.write_all(&self.buffer[..])?; + self.flush_buffered_bytes(0)?; + Ok(()) } - /// When buffer is full and is going to be sent to output, preserve buffer data that - /// is not final and should carried over to the next buffer. - pub fn flush_non_final_data(&mut self) -> Result<()> { - // carry over buffer data that might be not final - let mut i = self.buffer.len(); - if i > 0 { - i -= 1; - while self.buffer[i] == 0xFF { - assert!(i > 0); - i -= 1; - } + #[inline(always)] + fn send_to_output( + &mut self, + shift: &mut i32, + tmp_count: &mut i32, + tmp_low_value: &mut u32, + ) -> Result<()> { + let offset = *shift - *tmp_count; - self.writer.write_all(&self.buffer[..i])?; - self.buffer.drain(..i); + let last_byte = *tmp_low_value >> (24 - offset); + + if (last_byte & 0x100) != 0 { + self.flush_buffered_bytes(1)?; } + let last_byte = last_byte as u8; + + if last_byte == 0xff { + self.num_buffered_bytes += 1; + } else { + self.flush_buffered_bytes(0)?; + + self.buffered_byte = last_byte; + self.num_buffered_bytes = 1; + } + + *tmp_low_value <<= offset; + *shift = *tmp_count; + *tmp_low_value &= 0xffffff; + *tmp_count -= 8; + + Ok(()) + } + + #[inline(always)] + fn flush_buffered_bytes(&mut self, carry: u8) -> Result<()> { + let mut b = self.num_buffered_bytes; + if b > 0 { + self.writer + .write(&[self.buffered_byte.wrapping_add(carry)])?; + b -= 1; + + while b > 0 { + self.writer.write(&[0xffu8.wrapping_add(carry)])?; + b -= 1; + } + self.num_buffered_bytes = 0; + } Ok(()) } }