diff --git a/src/components/proxy/packet_router.rs b/src/components/proxy/packet_router.rs index 5d54c99e23..6b773af8ba 100644 --- a/src/components/proxy/packet_router.rs +++ b/src/components/proxy/packet_router.rs @@ -95,13 +95,10 @@ impl DownstreamReceiveWorkerConfig { return Err(PipelineError::NoUpstreamEndpoints); } + let cm = config.clusters.clone_value(); let filters = config.filters.load(); - let mut context = ReadContext::new( - config.clusters.clone_value(), - packet.source.into(), - packet.contents, - destinations, - ); + let mut context = + ReadContext::new(&cm, packet.source.into(), packet.contents, destinations); filters.read(&mut context).map_err(PipelineError::Filter)?; let ReadContext { contents, .. } = context; diff --git a/src/filters.rs b/src/filters.rs index 25da12ff55..0483d5d948 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -25,7 +25,7 @@ mod set; mod write; pub mod capture; -//pub mod compress; +pub mod compress; pub mod concatenate; pub mod debug; pub mod drop; @@ -51,7 +51,7 @@ pub mod prelude { #[doc(inline)] pub use self::{ capture::Capture, - //compress::Compress, + compress::Compress, concatenate::Concatenate, debug::Debug, drop::Drop, @@ -77,7 +77,7 @@ pub use self::chain::FilterChain; #[enum_dispatch::enum_dispatch(Filter)] pub enum FilterKind { Capture, - //Compress, + Compress, Concatenate, Debug, Drop, @@ -179,12 +179,15 @@ where } } -pub trait Packet { +pub trait Packet: Sized { fn as_slice(&self) -> &[u8]; + fn as_mut_slice(&mut self) -> &mut [u8]; + fn set_len(&mut self, len: usize); fn remove_head(&mut self, length: usize); fn remove_tail(&mut self, length: usize); fn extend_head(&mut self, bytes: &[u8]); fn extend_tail(&mut self, bytes: &[u8]); + fn alloc_sized(&self, size: usize) -> Option; } /// Trait for routing and manipulating packets. diff --git a/src/filters/compress.rs b/src/filters/compress.rs index 1b334dcc45..581c214aac 100644 --- a/src/filters/compress.rs +++ b/src/filters/compress.rs @@ -20,11 +20,10 @@ mod metrics; use crate::generated::quilkin::filters::compress::v1alpha1 as proto; -use crate::{filters::prelude::*, pool::BufferPool}; +use crate::filters::prelude::*; pub use compressor::Compressor; use metrics::Metrics; -use std::sync::Arc; pub use config::{Action, Config, Mode}; @@ -37,7 +36,6 @@ pub struct Compress { compression_mode: Mode, on_read: Action, on_write: Action, - pool: Arc, compressor: Compressor, } @@ -49,98 +47,93 @@ impl Compress { compression_mode: config.mode, on_read: config.on_read, on_write: config.on_write, - pool: Arc::new(BufferPool::new(num_cpus::get(), 64 * 1024)), } } } impl Filter for Compress { #[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))] - fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { - let original_size = ctx.contents.len(); + fn read(&self, ctx: &mut ReadContext<'_, P>) -> Result<(), FilterError> { + let original_size = ctx.contents.as_slice().len(); match self.on_read { - Action::Compress => { - match self.compressor.encode(self.pool.clone(), &mut ctx.contents) { - Ok(()) => { - self.metrics - .read_decompressed_bytes_total - .inc_by(original_size as u64); - self.metrics - .read_compressed_bytes_total - .inc_by(ctx.contents.len() as u64); - Ok(()) - } - Err(err) => Err(CompressionError::new( - err, - Direction::Read, - Action::Compress, - )), + Action::Compress => match self.compressor.encode(&ctx.contents) { + Ok(encoded) => { + self.metrics + .read_decompressed_bytes_total + .inc_by(original_size as u64); + self.metrics + .read_compressed_bytes_total + .inc_by(encoded.as_slice().len() as u64); + ctx.contents = encoded; + Ok(()) } - } - Action::Decompress => { - match self.compressor.decode(self.pool.clone(), &mut ctx.contents) { - Ok(()) => { - self.metrics - .read_compressed_bytes_total - .inc_by(original_size as u64); - self.metrics - .read_decompressed_bytes_total - .inc_by(ctx.contents.len() as u64); - Ok(()) - } - Err(err) => Err(CompressionError::new( - err, - Direction::Read, - Action::Decompress, - )), + Err(err) => Err(CompressionError::new( + err, + Direction::Read, + Action::Compress, + )), + }, + Action::Decompress => match self.compressor.decode(&ctx.contents) { + Ok(decoded) => { + self.metrics + .read_compressed_bytes_total + .inc_by(original_size as u64); + self.metrics + .read_decompressed_bytes_total + .inc_by(decoded.as_slice().len() as u64); + ctx.contents = decoded; + Ok(()) } - } + Err(err) => Err(CompressionError::new( + err, + Direction::Read, + Action::Decompress, + )), + }, Action::DoNothing => Ok(()), } } #[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))] - fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> { - let original_size = ctx.contents.len(); + fn write(&self, ctx: &mut WriteContext

) -> Result<(), FilterError> { + let original_size = ctx.contents.as_slice().len(); match self.on_write { - Action::Compress => { - match self.compressor.encode(self.pool.clone(), &mut ctx.contents) { - Ok(()) => { - self.metrics - .write_decompressed_bytes_total - .inc_by(original_size as u64); - self.metrics - .write_compressed_bytes_total - .inc_by(ctx.contents.len() as u64); - Ok(()) - } - Err(err) => Err(CompressionError::new( - err, - Direction::Write, - Action::Compress, - )), + Action::Compress => match self.compressor.encode(&ctx.contents) { + Ok(encoded) => { + self.metrics + .write_decompressed_bytes_total + .inc_by(original_size as u64); + self.metrics + .write_compressed_bytes_total + .inc_by(encoded.as_slice().len() as u64); + ctx.contents = encoded; + Ok(()) } - } - Action::Decompress => { - match self.compressor.decode(self.pool.clone(), &mut ctx.contents) { - Ok(()) => { - self.metrics - .write_compressed_bytes_total - .inc_by(original_size as u64); - self.metrics - .write_decompressed_bytes_total - .inc_by(ctx.contents.len() as u64); - Ok(()) - } - - Err(err) => Err(CompressionError::new( - err, - Direction::Write, - Action::Decompress, - )), + Err(err) => Err(CompressionError::new( + err, + Direction::Write, + Action::Compress, + )), + }, + Action::Decompress => match self.compressor.decode(&ctx.contents) { + Ok(decoded) => { + self.metrics + .write_compressed_bytes_total + .inc_by(original_size as u64); + self.metrics + .write_decompressed_bytes_total + .inc_by(decoded.as_slice().len() as u64); + ctx.contents = decoded; + Ok(()) } - } + + Err(err) => Err(CompressionError::new( + err, + Direction::Write, + Action::Decompress, + )), + }, Action::DoNothing => Ok(()), } } @@ -238,9 +231,7 @@ impl std::hash::Hash for CompressionError { #[cfg(test)] mod tests { use crate::{ - filters::compress::compressor::Compressor, - net::endpoint::Endpoint, - test::{alloc_buffer, BUFFER_POOL}, + filters::compress::compressor::Compressor, net::endpoint::Endpoint, test::alloc_buffer, }; use super::*; @@ -298,7 +289,7 @@ mod tests { ); let mut dest = Vec::new(); let mut read_context = ReadContext::new( - endpoints.into(), + &endpoints, "127.0.0.1:8080".parse().unwrap(), alloc_buffer(&expected), &mut dest, @@ -361,7 +352,7 @@ mod tests { let mut dest = Vec::new(); assert!(compression .read(&mut ReadContext::new( - endpoints.into(), + &endpoints, "127.0.0.1:8080".parse().unwrap(), alloc_buffer(b"hello"), &mut dest, @@ -385,7 +376,7 @@ mod tests { ); let mut dest = Vec::new(); let mut read_context = ReadContext::new( - endpoints.into(), + &endpoints, "127.0.0.1:8080".parse().unwrap(), alloc_buffer(b"hello"), &mut dest, @@ -406,31 +397,30 @@ mod tests { fn roundtrip_compression(compressor: Compressor) { let expected = contents_fixture(); - let mut contents = alloc_buffer(&expected); + let contents = alloc_buffer(&expected); - compressor - .encode(BUFFER_POOL.clone(), &mut contents) - .expect("failed to compress"); + let compressed = compressor.encode(&contents).expect("failed to compress"); assert!( !contents.is_empty(), "compressed array should be greater than 0" ); assert_ne!( - expected, &*contents, + expected, + compressed.as_slice(), "should not be equal, as one should be compressed" ); assert!( - expected.len() > contents.len(), + expected.len() > compressed.len(), "Original: {}. Compressed: {}", expected.len(), - contents.len() + compressed.len() ); // 45000 bytes uncompressed, 276 bytes compressed - compressor - .decode(BUFFER_POOL.clone(), &mut contents) + let decompressed = compressor + .decode(&compressed) .expect("failed to decompress"); assert_eq!( - expected, &*contents, + expected, &*decompressed, "should be equal, as decompressed state should go back to normal" ); } @@ -482,7 +472,7 @@ mod tests { ); let mut dest = Vec::new(); let mut read_context = ReadContext::new( - endpoints.into(), + &endpoints, "127.0.0.1:8080".parse().unwrap(), write_context.contents, &mut dest, diff --git a/src/filters/compress/compressor.rs b/src/filters/compress/compressor.rs index 5f938e5630..371e3dc632 100644 --- a/src/filters/compress/compressor.rs +++ b/src/filters/compress/compressor.rs @@ -14,9 +14,9 @@ * limitations under the License. */ -use crate::pool::{BufferPool, PoolBuffer}; +use crate::filters::Packet; use parking_lot::Mutex; -use std::{io, sync::Arc}; +use std::io; use lz4_flex::block; use snap::raw; @@ -30,78 +30,97 @@ pub enum Compressor { } impl Compressor { - pub fn encode(&self, pool: Arc, contents: &mut PoolBuffer) -> io::Result<()> { + pub fn encode(&self, contents: &P) -> io::Result

{ + let input = contents.as_slice(); let encoded = match self { Self::Snappy(imp) => { - let size = raw::max_compress_len(contents.len()); - let mut encoded = pool.alloc_sized(size); + let size = raw::max_compress_len(input.len()); + let mut encoded = contents.alloc_sized(size).ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::OutOfMemory, + "failed to allocate buffer for compress output", + ) + })?; let mut encoder = imp.encoder(); - let res = encoder.compress(contents, encoded.as_mut_slice(0..size)); + let res = encoder.compress(input, &mut encoded.as_mut_slice()[..dbg!(size)]); imp.absorb(encoder); - let compressed = res?; - encoded.truncate(compressed); + encoded.set_len(res?); encoded } Self::Lz4 => { - let size = block::get_maximum_output_size(contents.len()) + 3; - let mut encoded = pool.alloc_sized(size); - - let slen = size::write(encoded.as_mut_slice(0..size), contents.len() as u16); - - let compressed = block::compress_into(contents, encoded.as_mut_slice(slen..size)) - .map_err(|_e| { - // This should be impossible - io::Error::new( - io::ErrorKind::OutOfMemory, - "not enough space allocated for compressed output", + let size = block::get_maximum_output_size(input.len()) + 3; + let mut encoded = contents.alloc_sized(size).ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::OutOfMemory, + "failed to allocate buffer for compress output", ) })?; - encoded.truncate(compressed + slen); + let slen = size::write(encoded.as_mut_slice(), input.len() as u16); + + let compressed = + block::compress_into(input, &mut encoded.as_mut_slice()[slen..size]).map_err( + |_e| { + // This should be impossible + io::Error::new( + io::ErrorKind::OutOfMemory, + "not enough space allocated for compressed output", + ) + }, + )?; + + encoded.set_len(compressed + slen); encoded } }; - *contents = encoded; - Ok(()) + Ok(encoded) } - pub fn decode(&self, pool: Arc, contents: &mut PoolBuffer) -> io::Result<()> { + pub fn decode(&self, contents: &P) -> io::Result

{ + let input = contents.as_slice(); let decoded = match self { Self::Snappy(_imp) => { - let size = raw::decompress_len(contents)?; - let mut decoded = pool.alloc_sized(size); + let size = raw::decompress_len(input)?; + let mut decoded = contents.alloc_sized(size).ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::OutOfMemory, + "failed to allocate buffer for decompress output", + ) + })?; - let decompressed = - raw::Decoder::new().decompress(contents, decoded.as_mut_slice(0..size))?; + let decompressed = raw::Decoder::new().decompress(input, decoded.as_mut_slice())?; - decoded.truncate(decompressed); + decoded.set_len(decompressed); decoded } Self::Lz4 => { - let (size, slen) = size::read(contents); - let mut decoded = pool.alloc_sized(size as _); - - let decompressed = - block::decompress_into(&contents[slen..], decoded.as_mut_slice(0..size as _)) - .map_err(|_e| { - // This should be impossible - io::Error::new( - io::ErrorKind::OutOfMemory, - "not enough space allocated for decompressed output", - ) - })?; - - decoded.truncate(decompressed); + let (size, slen) = size::read(input); + let mut decoded = contents.alloc_sized(size as _).ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::OutOfMemory, + "failed to allocate buffer for decompress output", + ) + })?; + + let decompressed = block::decompress_into(&input[slen..], decoded.as_mut_slice()) + .map_err(|_e| { + // This should be impossible + io::Error::new( + io::ErrorKind::OutOfMemory, + "not enough space allocated for decompressed output", + ) + })?; + + decoded.set_len(decompressed); decoded } }; - *contents = decoded; - Ok(()) + Ok(decoded) } } diff --git a/src/filters/error.rs b/src/filters/error.rs index 899f2cc603..89a799abad 100644 --- a/src/filters/error.rs +++ b/src/filters/error.rs @@ -27,7 +27,7 @@ use filters::{Filter, FilterFactory}; pub enum FilterError { NoValueCaptured, TokenRouter(filters::token_router::RouterError), - //Compression(filters::compress::CompressionError), + Compression(filters::compress::CompressionError), Io(std::io::Error), FirewallDenied, MatchNoMetadata, @@ -41,7 +41,7 @@ impl FilterError { match self { Self::NoValueCaptured => "filter::capture::no value captured", Self::TokenRouter(tr) => tr.discriminant(), - //Self::Compression(_) => "filter::compression::io", + Self::Compression(_) => "filter::compression::io", Self::Io(..) => "filter::io", Self::FirewallDenied => "filter::firewall::denied", Self::MatchNoMetadata => "filter::match::no metadata", @@ -59,7 +59,7 @@ impl fmt::Display for FilterError { match self { Self::NoValueCaptured => f.write_str("no value captured"), Self::TokenRouter(tr) => write!(f, "{tr}"), - //Self::Compression(comp) => write!(f, "{comp}"), + Self::Compression(comp) => write!(f, "{comp}"), Self::Io(io) => write!(f, "{io}"), Self::FirewallDenied => f.write_str("packet denied by firewall"), Self::MatchNoMetadata => f.write_str("expected metadata key for match not present"), @@ -83,7 +83,7 @@ impl PartialEq for FilterError { match (self, other) { (Self::NoValueCaptured, Self::NoValueCaptured) => true, (Self::TokenRouter(tra), Self::TokenRouter(trb)) => tra.eq(trb), - //(Self::Compression(ca), Self::Compression(cb)) => ca.eq(cb), + (Self::Compression(ca), Self::Compression(cb)) => ca.eq(cb), (Self::Io(ia), Self::Io(ib)) => ia.kind().eq(&ib.kind()), (Self::FirewallDenied, Self::FirewallDenied) => true, (Self::MatchNoMetadata, Self::MatchNoMetadata) => true, @@ -104,7 +104,7 @@ impl Hash for FilterError { match self { Self::TokenRouter(re) => Hash::hash(&re, state), - //Self::Compression(ce) => Hash::hash(&ce, state), + Self::Compression(ce) => Hash::hash(&ce, state), Self::Io(io) => Hash::hash(&io.kind(), state), Self::Custom(ce) => state.write(ce.as_bytes()), Self::NoValueCaptured diff --git a/src/filters/set.rs b/src/filters/set.rs index e61b00e36d..fbcb715777 100644 --- a/src/filters/set.rs +++ b/src/filters/set.rs @@ -46,7 +46,7 @@ impl FilterSet { Self::with( [ filters::Capture::factory(), - //filters::Compress::factory(), + filters::Compress::factory(), filters::Concatenate::factory(), filters::Debug::factory(), filters::Drop::factory(),