Skip to content

Commit

Permalink
Remove destination address allocation (#1042)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Shadle authored Nov 26, 2024
1 parent 57040f8 commit 114e7e6
Show file tree
Hide file tree
Showing 27 changed files with 190 additions and 104 deletions.
4 changes: 4 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[target.'cfg(all())']
rustflags = [
"-Ctarget-feature=+aes,+avx2",
]
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 17 additions & 8 deletions benches/token_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,31 @@ fn token_router(b: Bencher, token_kind: &str) {
use rand::seq::SliceRandom as _;
let tok = tokens.choose(&mut rand).unwrap();

let mut rc = quilkin::filters::ReadContext::new(
cm.clone(),
quilkin::net::EndpointAddress::LOCALHOST,
pool.clone().alloc(),
);
rc.metadata.insert(
let mut metadata = quilkin::net::endpoint::DynamicMetadata::default();
metadata.insert(
quilkin::net::endpoint::metadata::Key::from_static(
quilkin::filters::capture::CAPTURED_BYTES,
),
quilkin::net::endpoint::metadata::Value::Bytes((*tok).clone().into()),
);

rc
(
cm.clone(),
pool.clone().alloc(),
Vec::with_capacity(1),
metadata,
)
})
.counter(divan::counter::BytesCount::new(total_token_size))
.bench_local_values(|mut rc| {
.bench_local_values(|(cm, buffer, mut dest, metadata)| {
let mut rc = quilkin::filters::ReadContext {
endpoints: cm,
destinations: &mut dest,
source: quilkin::net::EndpointAddress::LOCALHOST,
contents: buffer,
metadata,
};

let _ = divan::black_box(filter.sync_read(&mut rc));
})
}
Expand Down
9 changes: 8 additions & 1 deletion src/components/proxy/io_uring_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ pub enum PacketProcessorCtx {
sessions: Arc<crate::components::proxy::SessionPool>,
error_acc: super::error::ErrorAccumulator,
worker_id: usize,
destinations: Vec<crate::net::EndpointAddress>,
},
SessionPool {
pool: Arc<crate::components::proxy::SessionPool>,
Expand Down Expand Up @@ -326,6 +327,7 @@ fn process_packet(
sessions,
worker_id,
error_acc,
destinations,
} => {
let received_at = UtcTimestamp::now();
if let Some(last_received_at) = last_received_at {
Expand All @@ -340,7 +342,12 @@ fn process_packet(
};

crate::components::proxy::packet_router::DownstreamReceiveWorkerConfig::process_task(
ds_packet, *worker_id, config, sessions, error_acc,
ds_packet,
*worker_id,
config,
sessions,
error_acc,
destinations,
);

packet_processed_event.write(1);
Expand Down
13 changes: 6 additions & 7 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl DownstreamReceiveWorkerConfig {
config: &Arc<Config>,
sessions: &Arc<SessionPool>,
error_acc: &mut super::error::ErrorAccumulator,
destinations: &mut Vec<crate::net::EndpointAddress>,
) {
tracing::trace!(
id = worker_id,
Expand All @@ -70,7 +71,7 @@ impl DownstreamReceiveWorkerConfig {
);

let timer = metrics::processing_time(metrics::READ).start_timer();
match Self::process_downstream_received_packet(packet, config, sessions) {
match Self::process_downstream_received_packet(packet, config, sessions, destinations) {
Ok(()) => {
error_acc.maybe_send();
}
Expand All @@ -92,6 +93,7 @@ impl DownstreamReceiveWorkerConfig {
packet: DownstreamPacket,
config: &Arc<Config>,
sessions: &Arc<SessionPool>,
destinations: &mut Vec<crate::net::EndpointAddress>,
) -> Result<(), PipelineError> {
if !config.clusters.read().has_endpoints() {
tracing::trace!("no upstream endpoints");
Expand All @@ -103,21 +105,18 @@ impl DownstreamReceiveWorkerConfig {
config.clusters.clone_value(),
packet.source.into(),
packet.contents,
destinations,
);
filters.read(&mut context).map_err(PipelineError::Filter)?;

let ReadContext {
destinations,
contents,
..
} = context;
let ReadContext { contents, .. } = context;

// Similar to bytes::BytesMut::freeze, we turn the mutable pool buffer
// into an immutable one with its own internal arc so it can be cloned
// cheaply and returned to the pool once all references are dropped
let contents = contents.freeze();

for epa in destinations {
for epa in destinations.drain(0..) {
let session_key = SessionKey {
source: packet.source,
dest: epa.to_socket_addr()?,
Expand Down
1 change: 1 addition & 0 deletions src/components/proxy/packet_router/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl super::DownstreamReceiveWorkerConfig {
sessions,
error_acc: super::super::error::ErrorAccumulator::new(error_sender),
worker_id,
destinations: Vec::with_capacity(1),
},
io_uring_shared::PacketReceiver::Router(upstream_receiver),
buffer_pool,
Expand Down
10 changes: 9 additions & 1 deletion src/components/proxy/packet_router/reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl super::DownstreamReceiveWorkerConfig {

let mut error_acc =
crate::components::proxy::error::ErrorAccumulator::new(error_sender);
let mut destinations = Vec::with_capacity(1);

loop {
// Initialize a buffer for the UDP packet. We use the maximum size of a UDP
Expand All @@ -131,7 +132,14 @@ impl super::DownstreamReceiveWorkerConfig {
}
last_received_at = Some(received_at);

Self::process_task(packet, worker_id, &config, &sessions, &mut error_acc);
Self::process_task(
packet,
worker_id,
&config,
&sessions,
&mut error_acc,
&mut destinations,
);
}
Err(error) => {
tracing::error!(%error, "error receiving packet");
Expand Down
2 changes: 1 addition & 1 deletion src/config/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl<T: JsonSchema + Default> JsonSchema for Slot<T> {
}

impl<T: crate::filters::Filter + Default> crate::filters::Filter for Slot<T> {
fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> {
self.load().read(ctx)
}

Expand Down
2 changes: 1 addition & 1 deletion src/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ pub trait Filter: Send + Sync {
/// This function should return an `Some` if the packet processing should
/// proceed. If the packet should be rejected, it will return [`None`]
/// instead. By default, the context passes through unchanged.
fn read(&self, _: &mut ReadContext) -> Result<(), FilterError> {
fn read(&self, _: &mut ReadContext<'_>) -> Result<(), FilterError> {
Ok(())
}

Expand Down
6 changes: 5 additions & 1 deletion src/filters/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Capture {

impl Filter for Capture {
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))]
fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> {
let capture = self.capture.capture(&mut ctx.contents);
ctx.metadata.insert(
self.is_present_key,
Expand Down Expand Up @@ -160,11 +160,13 @@ mod tests {
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut dest = Vec::new();
assert!(filter
.read(&mut ReadContext::new(
endpoints.into(),
(std::net::Ipv4Addr::LOCALHOST, 80).into(),
alloc_buffer(b"abc"),
&mut dest,
))
.is_err());
}
Expand Down Expand Up @@ -237,10 +239,12 @@ mod tests {
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut dest = Vec::new();
let mut context = ReadContext::new(
endpoints.into(),
"127.0.0.1:80".parse().unwrap(),
alloc_buffer(b"helloabc"),
&mut dest,
);

filter.read(&mut context).unwrap();
Expand Down
40 changes: 20 additions & 20 deletions src/filters/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl schemars::JsonSchema for FilterChain {
}

impl Filter for FilterChain {
fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> {
for ((id, instance), histogram) in self
.filters
.iter()
Expand All @@ -296,12 +296,8 @@ impl Filter for FilterChain {
// has rejected, and the destinations is empty, we passthrough to all.
// Which mimics the old behaviour while avoid clones in most cases.
if ctx.destinations.is_empty() {
ctx.destinations = ctx
.endpoints
.endpoints()
.into_iter()
.map(|ep| ep.address)
.collect();
ctx.destinations
.extend(ctx.endpoints.endpoints().into_iter().map(|ep| ep.address));
}

Ok(())
Expand Down Expand Up @@ -382,10 +378,12 @@ mod tests {
crate::test::load_test_filters();
let config = TestConfig::new();
let endpoints_fixture = endpoints();
let mut dest = Vec::new();
let mut context = ReadContext::new(
endpoints_fixture.clone(),
"127.0.0.1:70".parse().unwrap(),
alloc_buffer(b"hello"),
&mut dest,
);

config.filters.read(&mut context).unwrap();
Expand Down Expand Up @@ -435,22 +433,24 @@ mod tests {
.unwrap();

let endpoints_fixture = endpoints();
let mut context = ReadContext::new(
endpoints_fixture.clone(),
"127.0.0.1:70".parse().unwrap(),
alloc_buffer(b"hello"),
);

chain.read(&mut context).unwrap();
let mut dest = Vec::new();

let (contents, metadata) = {
let mut context = ReadContext::new(
endpoints_fixture.clone(),
"127.0.0.1:70".parse().unwrap(),
alloc_buffer(b"hello"),
&mut dest,
);
chain.read(&mut context).unwrap();
(context.contents, context.metadata)
};
let expected = endpoints_fixture.clone();
assert_eq!(expected.endpoints(), context.destinations);
assert_eq!(
b"hello:odr:127.0.0.1:70:odr:127.0.0.1:70",
&*context.contents
);
assert_eq!(expected.endpoints(), dest);
assert_eq!(b"hello:odr:127.0.0.1:70:odr:127.0.0.1:70", &*contents);
assert_eq!(
"receive:receive",
context.metadata[&"downstream".into()].as_string().unwrap()
metadata[&"downstream".into()].as_string().unwrap()
);

let mut context = WriteContext::new(
Expand Down
10 changes: 9 additions & 1 deletion src/filters/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl Compress {

impl Filter for Compress {
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))]
fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> {
let original_size = ctx.contents.len();

match self.on_read {
Expand Down Expand Up @@ -296,10 +296,12 @@ mod tests {
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut dest = Vec::new();
let mut read_context = ReadContext::new(
endpoints.into(),
"127.0.0.1:8080".parse().unwrap(),
alloc_buffer(&expected),
&mut dest,
);
compress.read(&mut read_context).expect("should compress");

Expand Down Expand Up @@ -356,11 +358,13 @@ mod tests {
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut dest = Vec::new();
assert!(compression
.read(&mut ReadContext::new(
endpoints.into(),
"127.0.0.1:8080".parse().unwrap(),
alloc_buffer(b"hello"),
&mut dest,
))
.is_err());
}
Expand All @@ -379,10 +383,12 @@ mod tests {
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut dest = Vec::new();
let mut read_context = ReadContext::new(
endpoints.into(),
"127.0.0.1:8080".parse().unwrap(),
alloc_buffer(b"hello"),
&mut dest,
);
compression.read(&mut read_context).unwrap();
assert_eq!(b"hello", &*read_context.contents);
Expand Down Expand Up @@ -474,10 +480,12 @@ mod tests {
let endpoints = crate::net::cluster::ClusterMap::new_default(
[Endpoint::new("127.0.0.1:81".parse().unwrap())].into(),
);
let mut dest = Vec::new();
let mut read_context = ReadContext::new(
endpoints.into(),
"127.0.0.1:8080".parse().unwrap(),
write_context.contents,
&mut dest,
);

filter.read(&mut read_context).expect("should decompress");
Expand Down
Loading

0 comments on commit 114e7e6

Please sign in to comment.