Skip to content

Commit

Permalink
Impl compress
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Shadle committed Dec 20, 2024
1 parent d81fd25 commit e71897f
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 154 deletions.
9 changes: 3 additions & 6 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,10 @@ impl DownstreamReceiveWorkerConfig {
return Err(PipelineError::NoUpstreamEndpoints);
}

let cm = config.clusters.clone_value();
let filters = config.filters.load();
let mut context = ReadContext::new(
config.clusters.clone_value(),
packet.source.into(),
packet.contents,
destinations,
);
let mut context =
ReadContext::new(&cm, packet.source.into(), packet.contents, destinations);
filters.read(&mut context).map_err(PipelineError::Filter)?;

let ReadContext { contents, .. } = context;
Expand Down
11 changes: 7 additions & 4 deletions src/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod set;
mod write;

pub mod capture;
//pub mod compress;
pub mod compress;
pub mod concatenate;
pub mod debug;
pub mod drop;
Expand All @@ -51,7 +51,7 @@ pub mod prelude {
#[doc(inline)]
pub use self::{
capture::Capture,
//compress::Compress,
compress::Compress,
concatenate::Concatenate,
debug::Debug,
drop::Drop,
Expand All @@ -77,7 +77,7 @@ pub use self::chain::FilterChain;
#[enum_dispatch::enum_dispatch(Filter)]
pub enum FilterKind {
Capture,
//Compress,
Compress,
Concatenate,
Debug,
Drop,
Expand Down Expand Up @@ -179,12 +179,15 @@ where
}
}

pub trait Packet {
pub trait Packet: Sized {
fn as_slice(&self) -> &[u8];
fn as_mut_slice(&mut self) -> &mut [u8];
fn set_len(&mut self, len: usize);
fn remove_head(&mut self, length: usize);
fn remove_tail(&mut self, length: usize);
fn extend_head(&mut self, bytes: &[u8]);
fn extend_tail(&mut self, bytes: &[u8]);
fn alloc_sized(&self, size: usize) -> Option<Self>;
}

/// Trait for routing and manipulating packets.
Expand Down
178 changes: 84 additions & 94 deletions src/filters/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ mod metrics;

use crate::generated::quilkin::filters::compress::v1alpha1 as proto;

use crate::{filters::prelude::*, pool::BufferPool};
use crate::filters::prelude::*;

pub use compressor::Compressor;
use metrics::Metrics;
use std::sync::Arc;

pub use config::{Action, Config, Mode};

Expand All @@ -37,7 +36,6 @@ pub struct Compress {
compression_mode: Mode,
on_read: Action,
on_write: Action,
pool: Arc<BufferPool>,
compressor: Compressor,
}

Expand All @@ -49,98 +47,93 @@ impl Compress {
compression_mode: config.mode,
on_read: config.on_read,
on_write: config.on_write,
pool: Arc::new(BufferPool::new(num_cpus::get(), 64 * 1024)),
}
}
}

impl Filter for Compress {
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))]
fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> {
let original_size = ctx.contents.len();
fn read<P: Packet>(&self, ctx: &mut ReadContext<'_, P>) -> Result<(), FilterError> {
let original_size = ctx.contents.as_slice().len();

match self.on_read {
Action::Compress => {
match self.compressor.encode(self.pool.clone(), &mut ctx.contents) {
Ok(()) => {
self.metrics
.read_decompressed_bytes_total
.inc_by(original_size as u64);
self.metrics
.read_compressed_bytes_total
.inc_by(ctx.contents.len() as u64);
Ok(())
}
Err(err) => Err(CompressionError::new(
err,
Direction::Read,
Action::Compress,
)),
Action::Compress => match self.compressor.encode(&ctx.contents) {
Ok(encoded) => {
self.metrics
.read_decompressed_bytes_total
.inc_by(original_size as u64);
self.metrics
.read_compressed_bytes_total
.inc_by(encoded.as_slice().len() as u64);
ctx.contents = encoded;
Ok(())
}
}
Action::Decompress => {
match self.compressor.decode(self.pool.clone(), &mut ctx.contents) {
Ok(()) => {
self.metrics
.read_compressed_bytes_total
.inc_by(original_size as u64);
self.metrics
.read_decompressed_bytes_total
.inc_by(ctx.contents.len() as u64);
Ok(())
}
Err(err) => Err(CompressionError::new(
err,
Direction::Read,
Action::Decompress,
)),
Err(err) => Err(CompressionError::new(
err,
Direction::Read,
Action::Compress,
)),
},
Action::Decompress => match self.compressor.decode(&ctx.contents) {
Ok(decoded) => {
self.metrics
.read_compressed_bytes_total
.inc_by(original_size as u64);
self.metrics
.read_decompressed_bytes_total
.inc_by(decoded.as_slice().len() as u64);
ctx.contents = decoded;
Ok(())
}
}
Err(err) => Err(CompressionError::new(
err,
Direction::Read,
Action::Decompress,
)),
},
Action::DoNothing => Ok(()),
}
}

#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))]
fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> {
let original_size = ctx.contents.len();
fn write<P: Packet>(&self, ctx: &mut WriteContext<P>) -> Result<(), FilterError> {
let original_size = ctx.contents.as_slice().len();
match self.on_write {
Action::Compress => {
match self.compressor.encode(self.pool.clone(), &mut ctx.contents) {
Ok(()) => {
self.metrics
.write_decompressed_bytes_total
.inc_by(original_size as u64);
self.metrics
.write_compressed_bytes_total
.inc_by(ctx.contents.len() as u64);
Ok(())
}
Err(err) => Err(CompressionError::new(
err,
Direction::Write,
Action::Compress,
)),
Action::Compress => match self.compressor.encode(&ctx.contents) {
Ok(encoded) => {
self.metrics
.write_decompressed_bytes_total
.inc_by(original_size as u64);
self.metrics
.write_compressed_bytes_total
.inc_by(encoded.as_slice().len() as u64);
ctx.contents = encoded;
Ok(())
}
}
Action::Decompress => {
match self.compressor.decode(self.pool.clone(), &mut ctx.contents) {
Ok(()) => {
self.metrics
.write_compressed_bytes_total
.inc_by(original_size as u64);
self.metrics
.write_decompressed_bytes_total
.inc_by(ctx.contents.len() as u64);
Ok(())
}

Err(err) => Err(CompressionError::new(
err,
Direction::Write,
Action::Decompress,
)),
Err(err) => Err(CompressionError::new(
err,
Direction::Write,
Action::Compress,
)),
},
Action::Decompress => match self.compressor.decode(&ctx.contents) {
Ok(decoded) => {
self.metrics
.write_compressed_bytes_total
.inc_by(original_size as u64);
self.metrics
.write_decompressed_bytes_total
.inc_by(decoded.as_slice().len() as u64);
ctx.contents = decoded;
Ok(())
}
}

Err(err) => Err(CompressionError::new(
err,
Direction::Write,
Action::Decompress,
)),
},
Action::DoNothing => Ok(()),
}
}
Expand Down Expand Up @@ -238,9 +231,7 @@ impl std::hash::Hash for CompressionError {
#[cfg(test)]
mod tests {
use crate::{
filters::compress::compressor::Compressor,
net::endpoint::Endpoint,
test::{alloc_buffer, BUFFER_POOL},
filters::compress::compressor::Compressor, net::endpoint::Endpoint, test::alloc_buffer,
};

use super::*;
Expand Down Expand Up @@ -298,7 +289,7 @@ mod tests {
);
let mut dest = Vec::new();
let mut read_context = ReadContext::new(
endpoints.into(),
&endpoints,
"127.0.0.1:8080".parse().unwrap(),
alloc_buffer(&expected),
&mut dest,
Expand Down Expand Up @@ -361,7 +352,7 @@ mod tests {
let mut dest = Vec::new();
assert!(compression
.read(&mut ReadContext::new(
endpoints.into(),
&endpoints,
"127.0.0.1:8080".parse().unwrap(),
alloc_buffer(b"hello"),
&mut dest,
Expand All @@ -385,7 +376,7 @@ mod tests {
);
let mut dest = Vec::new();
let mut read_context = ReadContext::new(
endpoints.into(),
&endpoints,
"127.0.0.1:8080".parse().unwrap(),
alloc_buffer(b"hello"),
&mut dest,
Expand All @@ -406,31 +397,30 @@ mod tests {

fn roundtrip_compression(compressor: Compressor) {
let expected = contents_fixture();
let mut contents = alloc_buffer(&expected);
let contents = alloc_buffer(&expected);

compressor
.encode(BUFFER_POOL.clone(), &mut contents)
.expect("failed to compress");
let compressed = compressor.encode(&contents).expect("failed to compress");
assert!(
!contents.is_empty(),
"compressed array should be greater than 0"
);
assert_ne!(
expected, &*contents,
expected,
compressed.as_slice(),
"should not be equal, as one should be compressed"
);
assert!(
expected.len() > contents.len(),
expected.len() > compressed.len(),
"Original: {}. Compressed: {}",
expected.len(),
contents.len()
compressed.len()
); // 45000 bytes uncompressed, 276 bytes compressed

compressor
.decode(BUFFER_POOL.clone(), &mut contents)
let decompressed = compressor
.decode(&compressed)
.expect("failed to decompress");
assert_eq!(
expected, &*contents,
expected, &*decompressed,
"should be equal, as decompressed state should go back to normal"
);
}
Expand Down Expand Up @@ -482,7 +472,7 @@ mod tests {
);
let mut dest = Vec::new();
let mut read_context = ReadContext::new(
endpoints.into(),
&endpoints,
"127.0.0.1:8080".parse().unwrap(),
write_context.contents,
&mut dest,
Expand Down
Loading

0 comments on commit e71897f

Please sign in to comment.