Skip to content

Commit

Permalink
Make Config::filters a dynamic property (#1124)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Shadle authored Feb 20, 2025
1 parent 2272c27 commit 96e3bd0
Show file tree
Hide file tree
Showing 22 changed files with 223 additions and 125 deletions.
6 changes: 5 additions & 1 deletion crates/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,11 @@ impl Pail {
}

if !cfg.filters.is_empty() {
config.filters.store(Arc::new(cfg.filters));
config
.dyn_cfg
.filters()
.unwrap()
.store(Arc::new(cfg.filters));
}
}

Expand Down
123 changes: 55 additions & 68 deletions crates/test/tests/xdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,28 @@ fn endpoints(eps: &[(SocketAddr, &[u8])]) -> BTreeSet<net::Endpoint> {
.collect()
}

#[inline]
fn make_config(
filters: filters::FilterChain,
endpoints: BTreeSet<net::Endpoint>,
) -> process::ConfigState {
let cm = quilkin::net::ClusterMap::new();
cm.insert(None, endpoints);

process::ConfigState {
filters: quilkin::config::Slot::new(filters),
clusters: quilkin::config::Watch::new(cm),
}
}

/// Validates we can do basic processing and forwarding of packets
#[tokio::test]
async fn simple_forwarding() {
const SERVER: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::new(1, 1, 1, 1), 1111);
const PROXY: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::new(2, 2, 2, 2), 7777);
const CLIENT: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::new(5, 5, 5, 5), 8888);

let config = quilkin::Config::default_non_agent();
config.filters.store(Arc::new(
let config = make_config(
filters::FilterChain::try_create([
filters::Capture::as_filter_config(filters::capture::Config {
metadata_key: filters::capture::CAPTURED_BYTES.into(),
Expand All @@ -53,15 +66,13 @@ async fn simple_forwarding() {
filters::TokenRouter::as_filter_config(None).unwrap(),
])
.unwrap(),
));
config.clusters.modify(|clusters| {
clusters.insert(None, endpoints(&[(SERVER.into(), &[0xf0])]));
});
endpoints(&[(SERVER.into(), &[0xf0])]),
);

let mut state = process::State {
external_port: PROXY.port().into(),
qcmp_port: 0.into(),
config: Arc::new(config),
config,
destinations: Vec::with_capacity(1),
addr_to_asn: Default::default(),
sessions: Arc::new(Default::default()),
Expand Down Expand Up @@ -124,8 +135,7 @@ async fn changes_ip_version() {
SocketAddrV6::new(Ipv6Addr::new(2, 2, 2, 2, 2, 2, 2, 2), 7777, 0, 0);
const CLIENT: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::new(5, 5, 5, 5), 8888);

let config = quilkin::Config::default_non_agent();
config.filters.store(Arc::new(
let config = make_config(
filters::FilterChain::try_create([
filters::Capture::as_filter_config(filters::capture::Config {
metadata_key: filters::capture::CAPTURED_BYTES.into(),
Expand All @@ -138,15 +148,13 @@ async fn changes_ip_version() {
filters::TokenRouter::as_filter_config(None).unwrap(),
])
.unwrap(),
));
config.clusters.modify(|clusters| {
clusters.insert(None, endpoints(&[(SERVER.into(), &[0xf1])]));
});
endpoints(&[(SERVER.into(), &[0xf1])]),
);

let mut state = process::State {
external_port: PROXY4.port().into(),
qcmp_port: 0.into(),
config: Arc::new(config),
config,
destinations: Vec::with_capacity(1),
addr_to_asn: Default::default(),
sessions: Arc::new(Default::default()),
Expand Down Expand Up @@ -258,8 +266,7 @@ async fn packet_manipulation() {

// Test suffix removal
{
let config = quilkin::Config::default_non_agent();
config.filters.store(Arc::new(
let config = make_config(
filters::FilterChain::try_create([
filters::Capture::as_filter_config(filters::capture::Config {
metadata_key: filters::capture::CAPTURED_BYTES.into(),
Expand All @@ -272,15 +279,13 @@ async fn packet_manipulation() {
filters::TokenRouter::as_filter_config(None).unwrap(),
])
.unwrap(),
));
config.clusters.modify(|clusters| {
clusters.insert(None, endpoints(&[(SERVER.into(), &[0xf1])]));
});
endpoints(&[(SERVER.into(), &[0xf1])]),
);

let mut state = process::State {
external_port: PROXY.port().into(),
qcmp_port: 0.into(),
config: Arc::new(config),
config,
destinations: Vec::with_capacity(1),
addr_to_asn: Default::default(),
sessions: Arc::new(Default::default()),
Expand Down Expand Up @@ -322,8 +327,7 @@ async fn packet_manipulation() {

// Test prefix removal
{
let config = quilkin::Config::default_non_agent();
config.filters.store(Arc::new(
let config = make_config(
filters::FilterChain::try_create([
filters::Capture::as_filter_config(filters::capture::Config {
metadata_key: filters::capture::CAPTURED_BYTES.into(),
Expand All @@ -336,15 +340,13 @@ async fn packet_manipulation() {
filters::TokenRouter::as_filter_config(None).unwrap(),
])
.unwrap(),
));
config.clusters.modify(|clusters| {
clusters.insert(None, endpoints(&[(SERVER.into(), &[0xf1])]));
});
endpoints(&[(SERVER.into(), &[0xf1])]),
);

let mut state = process::State {
external_port: PROXY.port().into(),
qcmp_port: 0.into(),
config: Arc::new(config),
config,
destinations: Vec::with_capacity(1),
addr_to_asn: Default::default(),
sessions: Arc::new(Default::default()),
Expand Down Expand Up @@ -387,8 +389,8 @@ async fn packet_manipulation() {
// Test suffix removal, combined with read append and write prepend
{
let concat_data = [0xff; 11];
let config = quilkin::Config::default_non_agent();
config.filters.store(Arc::new(
let data = [0xf1u8; 20];
let config = make_config(
filters::FilterChain::try_create([
filters::Capture::as_filter_config(filters::capture::Config {
metadata_key: filters::capture::CAPTURED_BYTES.into(),
Expand All @@ -407,16 +409,13 @@ async fn packet_manipulation() {
.unwrap(),
])
.unwrap(),
));
let data = [0xf1u8; 20];
config.clusters.modify(|clusters| {
clusters.insert(None, endpoints(&[(SERVER.into(), &data[..data.len() - 2])]));
});
endpoints(&[(SERVER.into(), &data[..data.len() - 2])]),
);

let mut state = process::State {
external_port: PROXY.port().into(),
qcmp_port: 0.into(),
config: Arc::new(config),
config,
destinations: Vec::with_capacity(1),
addr_to_asn: Default::default(),
sessions: Arc::new(Default::default()),
Expand Down Expand Up @@ -478,9 +477,9 @@ async fn multiple_servers() {
let mut servers: Vec<_> = (1..20)
.map(|i| SocketAddrV6::new(Ipv6Addr::new(i, i, i, i, i, i, i, i), 1000 + i, 0, 0))
.collect();
let tok = [0xf1u8];

let config = quilkin::Config::default_non_agent();
config.filters.store(Arc::new(
let config = make_config(
filters::FilterChain::try_create([
filters::Capture::as_filter_config(filters::capture::Config {
metadata_key: filters::capture::CAPTURED_BYTES.into(),
Expand All @@ -493,25 +492,19 @@ async fn multiple_servers() {
filters::TokenRouter::as_filter_config(None).unwrap(),
])
.unwrap(),
));
let tok = [0xf1u8];
config.clusters.modify(|clusters| {
clusters.insert(
None,
endpoints(
servers
.iter()
.map(|a| (SocketAddr::from(*a), &tok[..]))
.collect::<Vec<_>>()
.as_slice(),
),
)
});
endpoints(
servers
.iter()
.map(|a| (SocketAddr::from(*a), &tok[..]))
.collect::<Vec<_>>()
.as_slice(),
),
);

let mut state = process::State {
external_port: PROXY.port().into(),
qcmp_port: 0.into(),
config: Arc::new(config),
config,
destinations: Vec::with_capacity(1),
addr_to_asn: Default::default(),
sessions: Arc::new(Default::default()),
Expand Down Expand Up @@ -567,8 +560,7 @@ async fn many_sessions() {
const SERVER: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::new(1, 1, 1, 1), 1111);
const PROXY: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::new(2, 2, 2, 2), 7777);

let config = quilkin::Config::default_non_agent();
config.filters.store(Arc::new(
let config = make_config(
filters::FilterChain::try_create([
filters::Capture::as_filter_config(filters::capture::Config {
metadata_key: filters::capture::CAPTURED_BYTES.into(),
Expand All @@ -581,15 +573,13 @@ async fn many_sessions() {
filters::TokenRouter::as_filter_config(None).unwrap(),
])
.unwrap(),
));
config.clusters.modify(|clusters| {
clusters.insert(None, endpoints(&[(SERVER.into(), &[0xf0])]));
});
endpoints(&[(SERVER.into(), &[0xf0])]),
);

let mut state = process::State {
external_port: PROXY.port().into(),
qcmp_port: 0.into(),
config: Arc::new(config),
config,
destinations: Vec::with_capacity(1),
addr_to_asn: Default::default(),
sessions: Arc::new(Default::default()),
Expand Down Expand Up @@ -697,8 +687,7 @@ async fn frees_dropped_packets() {
const CLIENT: SocketAddrV6 =
SocketAddrV6::new(Ipv6Addr::new(1, 2, 3, 4, 5, 6, 7, 8), 9999, 0, 0);

let config = quilkin::Config::default_non_agent();
config.filters.store(Arc::new(
let config = make_config(
filters::FilterChain::try_create([
filters::Capture::as_filter_config(filters::capture::Config {
metadata_key: filters::capture::CAPTURED_BYTES.into(),
Expand All @@ -711,15 +700,13 @@ async fn frees_dropped_packets() {
filters::TokenRouter::as_filter_config(None).unwrap(),
])
.unwrap(),
));
config.clusters.modify(|clusters| {
clusters.insert(None, endpoints(&[(SERVER.into(), &[0xf0])]));
});
endpoints(&[(SERVER.into(), &[0xf0])]),
);

let mut state = process::State {
external_port: PROXY4.port().into(),
qcmp_port: 0.into(),
config: Arc::new(config),
config,
destinations: Vec::with_capacity(1),
addr_to_asn: Default::default(),
sessions: Arc::new(Default::default()),
Expand Down Expand Up @@ -815,7 +802,7 @@ async fn qcmp() {
let mut state = process::State {
external_port: 7777.into(),
qcmp_port: PROXY.port().into(),
config: Arc::new(quilkin::Config::default_non_agent()),
config: make_config(filters::FilterChain::default(), endpoints(&[])),
destinations: Vec::with_capacity(1),
addr_to_asn: Default::default(),
sessions: Arc::new(Default::default()),
Expand Down
11 changes: 10 additions & 1 deletion src/cli/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,13 +489,22 @@ impl Service {
#[cfg(target_os = "linux")]
fn spawn_xdp(&self, config: Arc<Config>, force_xdp: bool) -> eyre::Result<Finalizer> {
use crate::net::xdp;
use eyre::Context as _;
use eyre::{Context as _, ContextCompat as _};

// TODO: remove this once it's been more stabilized
if !force_xdp {
eyre::bail!("XDP currently disabled by default");
}

let filters = config
.dyn_cfg
.filters()
.clone()
.context("XDP requires a filter chain")?;
let clusters = config.clusters.clone();

let config = crate::net::xdp::process::ConfigState { filters, clusters };

let udp_port = if self.udp_enabled { self.udp_port } else { 0 };
let qcmp_port = if self.qcmp_enabled { self.qcmp_port } else { 0 };

Expand Down
6 changes: 5 additions & 1 deletion src/components/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ impl Proxy {

{
use crate::filters::StaticFilter as _;
config.filters.store(Arc::new(
let Some(filters) = config.dyn_cfg.filters() else {
eyre::bail!("empty filters were not created")
};

filters.store(Arc::new(
crate::filters::FilterChain::try_create([
crate::filters::Capture::as_filter_config(
crate::filters::capture::Config {
Expand Down
6 changes: 5 additions & 1 deletion src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ impl<P: PacketMut> DownstreamPacket<P> {
}

let cm = config.clusters.clone_value();
let filters = config.filters.load();
let Some(filters) = config.dyn_cfg.filters() else {
return Err(PipelineError::Filter(crate::filters::FilterError::Custom(
"no filters loaded",
)));
};
let mut context = ReadContext::new(&cm, self.source.into(), self.contents, destinations);
filters.read(&mut context).map_err(PipelineError::Filter)?;

Expand Down
8 changes: 7 additions & 1 deletion src/components/proxy/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,14 @@ impl SessionPool {
tracing::trace!(%source, %dest, length = packet.len(), "received packet from upstream");

let mut context = crate::filters::WriteContext::new(source.into(), dest.into(), packet);
let Some(filters) = config.dyn_cfg.filters() else {
return Err((
asn_info,
Error::Filter(crate::filters::FilterError::Custom("no filters loaded")),
));
};

if let Err(err) = config.filters.load().write(&mut context) {
if let Err(err) = filters.load().write(&mut context) {
return Err((asn_info, err.into()));
}

Expand Down
Loading

0 comments on commit 96e3bd0

Please sign in to comment.