Skip to content

Commit

Permalink
feat: Add xDS provider (#1087)
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky authored Feb 3, 2025
1 parent 0181f37 commit 5d4f7e8
Showing 1 changed file with 73 additions and 4 deletions.
77 changes: 73 additions & 4 deletions src/config/providersv2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const MAX_DELAY: std::time::Duration = std::time::Duration::from_secs(2);

/// The available xDS source provider.
#[derive(Clone, Debug, Default, clap::Args)]
#[command(next_help_heading = "Provider Options")]
pub struct Providers {
/// Watches Agones' game server CRDs for `Allocated` game server endpoints,
/// and for a `ConfigMap` that specifies the filter configuration.
Expand Down Expand Up @@ -101,19 +102,19 @@ pub struct Providers {
long = "provider.mds.endpoints",
env = "QUILKIN_PROVIDERS_MDS_ENDPOINTS"
)]
pub relay: Vec<tonic::transport::Endpoint>,
relay: Vec<tonic::transport::Endpoint>,
/// The remote URL or local file path to retrieve the Maxmind database.
#[clap(
long = "provider.mmdb.endpoints",
env = "QUILKIN_PROVIDERS_MMDB_ENDPOINTS"
)]
pub mmdb: Option<crate::net::maxmind_db::Source>,
mmdb: Option<crate::net::maxmind_db::Source>,
/// One or more socket addresses to forward packets to.
#[clap(
long = "provider.static.endpoints",
env = "QUILKIN_PROVIDERS_STATIC_ENDPOINTS"
)]
pub endpoints: Vec<SocketAddr>,
endpoints: Vec<SocketAddr>,
/// Assigns dynamic tokens to each address in the `--to` argument
///
/// Format is `<number of unique tokens>:<length of token suffix for each packet>`
Expand All @@ -122,10 +123,46 @@ pub struct Providers {
env = "QUILKIN_PROVIDERS_STATIC_ENDPOINT_TOKENS",
requires("endpoints")
)]
pub endpoint_tokens: Option<String>,
endpoint_tokens: Option<String>,

/// Enabled the xDS (Discovery Service) provider.
#[arg(
long = "provider.xds",
env = "QUILKIN_PROVIDERS_XDS",
conflicts_with("k8s_enabled"),
conflicts_with("fs_enabled"),
default_value_t = false
)]
provider_xds_enabled: bool,
/// One or more xDS service endpoints to listen for config changes.
#[clap(
long = "provider.xds.endpoints",
env = "QUILKIN_PROVIDERS_XDS_ENDPOINTS"
)]
xds_endpoints: Vec<tonic::transport::Endpoint>,
}

impl Providers {
#[allow(clippy::type_complexity)]
const SUBS: &[(&str, &[(&str, Vec<String>)])] = &[
(
"9",
&[
(crate::xds::CLUSTER_TYPE, Vec::new()),
(crate::xds::DATACENTER_TYPE, Vec::new()),
(crate::xds::FILTER_CHAIN_TYPE, Vec::new()),
],
),
(
"",
&[
(crate::xds::CLUSTER_TYPE, Vec::new()),
(crate::xds::DATACENTER_TYPE, Vec::new()),
(crate::xds::LISTENER_TYPE, Vec::new()),
],
),
];

pub fn agones(mut self) -> Self {
self.agones_enabled = true;
self
Expand Down Expand Up @@ -352,6 +389,36 @@ impl Providers {
}
}

pub fn spawn_xds_provider(
self,
config: &std::sync::Arc<crate::Config>,
health_check: Arc<AtomicBool>,
) -> tokio::task::JoinHandle<crate::Result<()>> {
let config = config.clone();
let endpoints = self.xds_endpoints;
let tx = Option::<tokio::sync::mpsc::UnboundedSender<String>>::None;
tokio::spawn(Self::task(health_check.clone(), move || {
let config = config.clone();
let endpoints = endpoints.clone();
let health_check = health_check.clone();
let tx = tx.clone();
async move {
let client = crate::net::xds::AdsClient::connect(
String::clone(&config.id.load()),
endpoints,
)
.await?;

let _stream = client
.delta_subscribe(config.clone(), health_check.clone(), tx, Self::SUBS)
.await
.map_err(|_| eyre::eyre!("failed to acquire delta stream"))?;

std::future::pending().await
}
}))
}

#[tracing::instrument(level = "trace", skip_all)]
pub fn spawn_providers(
self,
Expand All @@ -361,6 +428,8 @@ impl Providers {
) -> tokio::task::JoinHandle<crate::Result<()>> {
if self.k8s_enabled || self.agones_enabled {
self.spawn_k8s_provider(health_check, locality, config.clone())
} else if self.provider_xds_enabled {
self.spawn_xds_provider(config, health_check)
} else if self.fs_enabled {
let config = config.clone();
tokio::spawn(Self::task(health_check.clone(), {
Expand Down

0 comments on commit 5d4f7e8

Please sign in to comment.