Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add static provider, allow XDP option be set through env, allow no command in CLI #1082

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 71 additions & 52 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub struct Cli {
#[clap(short, long, env)]
pub quiet: bool,
#[clap(subcommand)]
pub command: Commands,
pub command: Option<Commands>,
#[clap(
long,
default_value_t = LogFormats::Auto,
Expand All @@ -134,6 +134,8 @@ pub struct Cli {
#[command(flatten)]
pub locality: LocalityCli,
#[command(flatten)]
pub providers: crate::config::providersv2::Providers,
#[command(flatten)]
pub service: Service,
}

Expand Down Expand Up @@ -209,47 +211,55 @@ impl Cli {
// Non-long running commands (e.g. ones with no administration server)
// are executed here.
use crate::components::{self, admin as admin_server};
let mode = match &self.command {
Commands::Qcmp(Qcmp::Ping(ping)) => return ping.run().await,
Commands::GenerateConfigSchema(generator) => {
return generator.generate_config_schema();
}
Commands::Agent(_) => Admin::Agent(<_>::default()),
Commands::Proxy(proxy) => {
let ready = components::proxy::Ready {
idle_request_interval: proxy
.idle_request_interval_secs
.map(std::time::Duration::from_secs)
.unwrap_or(admin_server::IDLE_REQUEST_INTERVAL),
..Default::default()
};
Admin::Proxy(ready)
}
Commands::Manage(_mng) => {
let ready = components::manage::Ready {
is_manage: true,
..Default::default()
};
Admin::Manage(ready)
}
Commands::Relay(relay) => {
let ready = components::relay::Ready {
idle_request_interval: relay
.idle_request_interval_secs
.map(std::time::Duration::from_secs)
.unwrap_or(admin_server::IDLE_REQUEST_INTERVAL),
..Default::default()
};
Admin::Relay(ready)
}
let mode = if let Some(command) = &self.command {
Some(match command {
Commands::Qcmp(Qcmp::Ping(ping)) => return ping.run().await,
Commands::GenerateConfigSchema(generator) => {
return generator.generate_config_schema();
}
Commands::Agent(_) => Admin::Agent(<_>::default()),
Commands::Proxy(proxy) => {
let ready = components::proxy::Ready {
idle_request_interval: proxy
.idle_request_interval_secs
.map(std::time::Duration::from_secs)
.unwrap_or(admin_server::IDLE_REQUEST_INTERVAL),
..Default::default()
};
Admin::Proxy(ready)
}
Commands::Manage(_mng) => {
let ready = components::manage::Ready {
is_manage: true,
..Default::default()
};
Admin::Manage(ready)
}
Commands::Relay(relay) => {
let ready = components::relay::Ready {
idle_request_interval: relay
.idle_request_interval_secs
.map(std::time::Duration::from_secs)
.unwrap_or(admin_server::IDLE_REQUEST_INTERVAL),
..Default::default()
};
Admin::Relay(ready)
}
})
} else {
None
};

if !self.service.any_service_enabled() && mode.is_none() {
eyre::bail!("no service specified, shutting down");
}

tracing::debug!(cli = ?self, "config parameters");

let config = Arc::new(match Self::read_config(self.config)? {
Some(mut config) => {
// Workaround deficiency in serde flatten + untagged
if matches!(self.command, Commands::Agent(..)) {
if matches!(self.command, Some(Commands::Agent(..))) {
config.datacenter = match config.datacenter {
crate::config::DatacenterConfig::Agent {
icao_code,
Expand All @@ -272,38 +282,47 @@ impl Cli {

config
}
None if matches!(self.command, Commands::Agent(..)) => Config::default_agent(),
None if matches!(self.command, Some(Commands::Agent(..))) => Config::default_agent(),
None => Config::default_non_agent(),
});

if self.admin.enabled {
mode.server(config.clone(), self.admin.address);
if let Some(mode) = mode.as_ref() {
mode.server(config.clone(), self.admin.address);
}
}

let shutdown_rx = crate::signal::spawn_handler();
let mut shutdown_rx = crate::signal::spawn_handler();

crate::alloc::spawn_heap_stats_updates(
std::time::Duration::from_secs(10),
shutdown_rx.clone(),
);

let ready = <_>::default();
let locality = self.locality.locality();
self.providers
.spawn_providers(&config, ready, locality.clone());
self.service.spawn_services(&config, &shutdown_rx)?;

let locality = self.locality.locality();
match (self.command, mode) {
(Commands::Agent(agent), Admin::Agent(ready)) => {
agent.run(locality, config, ready, shutdown_rx).await
}
(Commands::Proxy(runner), Admin::Proxy(ready)) => {
runner.run(config, ready, tx, shutdown_rx).await
}
(Commands::Manage(manager), Admin::Manage(ready)) => {
manager.run(locality, config, ready, shutdown_rx).await
}
(Commands::Relay(relay), Admin::Relay(ready)) => {
relay.run(locality, config, ready, shutdown_rx).await
if let Some(mode) = mode {
match (self.command.unwrap(), mode) {
(Commands::Agent(agent), Admin::Agent(ready)) => {
agent.run(locality, config, ready, shutdown_rx).await
}
(Commands::Proxy(runner), Admin::Proxy(ready)) => {
runner.run(config, ready, tx, shutdown_rx).await
}
(Commands::Manage(manager), Admin::Manage(ready)) => {
manager.run(locality, config, ready, shutdown_rx).await
}
(Commands::Relay(relay), Admin::Relay(ready)) => {
relay.run(locality, config, ready, shutdown_rx).await
}
_ => unreachable!(),
}
_ => unreachable!(),
} else {
shutdown_rx.changed().await.map_err(From::from)
}
}

Expand Down
34 changes: 29 additions & 5 deletions src/cli/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,15 @@ impl Service {
self
}

/// Sets the xDS service port.
pub fn any_service_enabled(&self) -> bool {
self.udp_enabled
|| self.qcmp_enabled
|| self.phoenix_enabled
|| self.xds_enabled
|| self.mds_enabled
}

/// The main entrypoint for listening network servers. When called will
/// spawn any and all enabled services, if successful returning a future
/// that can be await to wait on services to be cancelled.
Expand Down Expand Up @@ -202,6 +211,7 @@ impl Service {
shutdown_rx: &crate::signal::ShutdownRx,
) -> crate::Result<impl std::future::Future<Output = crate::Result<()>>> {
if self.phoenix_enabled {
tracing::info!(port=%self.qcmp_port, "starting phoenix service");
let phoenix = crate::net::TcpListener::bind(Some(self.phoenix_port))?;
crate::net::phoenix::spawn(
phoenix,
Expand All @@ -220,6 +230,7 @@ impl Service {
shutdown_rx: &crate::signal::ShutdownRx,
) -> crate::Result<impl Future<Output = crate::Result<()>>> {
if self.qcmp_enabled {
tracing::info!(port=%self.qcmp_port, "starting qcmp service");
let qcmp = crate::net::raw_socket_with_reuse(self.qcmp_port)?;
crate::codec::qcmp::spawn(qcmp, shutdown_rx.clone())?;
}
Expand All @@ -238,6 +249,7 @@ impl Service {

use futures::TryFutureExt as _;

tracing::info!(port=%self.mds_port, "starting mds service");
let listener = crate::net::TcpListener::bind(Some(self.mds_port))?;

Ok(either::Right(
Expand Down Expand Up @@ -291,6 +303,8 @@ impl Service {
return Ok((either::Left(std::future::pending()), None));
}

tracing::info!(port=%self.udp_port, "starting udp service");

#[cfg(target_os = "linux")]
{
match self.spawn_xdp(config.clone(), self.xdp.force_xdp) {
Expand Down Expand Up @@ -368,6 +382,7 @@ impl Service {
eyre::bail!("XDP currently disabled by default");
}

tracing::info!(port=%self.mds_port, "setting up xdp module");
let workers = xdp::setup_xdp_io(xdp::XdpConfig {
nic: self
.xdp
Expand Down Expand Up @@ -396,31 +411,40 @@ pub struct XdpOptions {
/// If not specified quilkin will attempt to determine the most appropriate
/// network interface to use. Quilkin will exit with an error if the network
/// interface does not exist, or a suitable default cannot be determined.
#[clap(long = "service.udp.xdp.network-interface")]
#[clap(
long = "service.udp.xdp.network-interface",
env = "QUILKIN_SERVICE_UDP_XDP_NETWORK_INTERFACE"
)]
pub network_interface: Option<String>,
/// Forces the use of XDP.
///
/// If XDP is not available on the chosen NIC, Quilkin exits with an error.
/// If false, io-uring will be used as the fallback implementation.
#[clap(long = "service.udp.xdp")]
#[clap(long = "service.udp.xdp", env = "QUILKIN_SERVICE_UDP_XDP")]
pub force_xdp: bool,
/// Forces the use of [`XDP_ZEROCOPY`](https://www.kernel.org/doc/html/latest/networking/af_xdp.html#xdp-copy-and-xdp-zerocopy-bind-flags)
///
/// If zero copy is not available on the chosen NIC, Quilkin exits with an error
#[clap(long = "service.udp.xdp.zerocopy")]
#[clap(
long = "service.udp.xdp.zerocopy",
env = "QUILKIN_SERVICE_UDP_XDP_ZEROCOPY"
)]
pub force_zerocopy: bool,
/// Forces the use of [TX checksum offload](https://docs.kernel.org/6.8/networking/xsk-tx-metadata.html)
///
/// TX checksum offload is an optional feature allowing the data portion of
/// a packet to have its internet checksum calculation offloaded to the NIC,
/// as otherwise this is done in software
#[clap(long = "service.udp.xdp.tco")]
#[clap(long = "service.udp.xdp.tco", env = "QUILKIN_SERVICE_UDP_XDP_TCO")]
pub force_tx_checksum_offload: bool,
/// The maximum amount of memory mapped for packet buffers, in bytes
///
/// If not specified, this defaults to 4MiB (2k allocated packets of 2k each at a time)
/// per NIC queue, ie 128MiB on a 32 queue NIC
#[clap(long = "service.udp.xdp.memory-limit")]
#[clap(
long = "service.udp.xdp.memory-limit",
env = "QUILKIN_SERVICE_UDP_XDP_MEMORY_LIMIT"
)]
pub maximum_memory: Option<u64>,
}

Expand Down
Loading
Loading