diff --git a/control-plane/csi-driver/src/bin/controller/config.rs b/control-plane/csi-driver/src/bin/controller/config.rs index 244264307..9a3e8aa86 100755 --- a/control-plane/csi-driver/src/bin/controller/config.rs +++ b/control-plane/csi-driver/src/bin/controller/config.rs @@ -15,6 +15,8 @@ pub(crate) struct CsiControllerConfig { node_selector: HashMap, /// Max Outstanding Create Volume Requests. create_volume_limit: usize, + /// Disable Stale Cleanup. + disable_stale_cleanup: bool, } impl CsiControllerConfig { @@ -43,11 +45,17 @@ impl CsiControllerConfig { .map(|s| s.map(|s| s.as_str())), )?; + let disable_stale_cleanup = args.get_flag("disable_stale_cleanup"); + if disable_stale_cleanup { + tracing::warn!("Stale entry cleanup is disabled!") + } + CONFIG.get_or_init(|| Self { rest_endpoint: rest_endpoint.into(), io_timeout: io_timeout.into(), node_selector, create_volume_limit, + disable_stale_cleanup, }); Ok(()) } @@ -78,4 +86,8 @@ impl CsiControllerConfig { pub(crate) fn node_selector_segment(&self) -> HashMap { self.node_selector.clone() } + /// Disable Stale Cleanup. + pub(crate) fn disable_stale_cleanup(&self) -> bool { + self.disable_stale_cleanup + } } diff --git a/control-plane/csi-driver/src/bin/controller/controller.rs b/control-plane/csi-driver/src/bin/controller/controller.rs index 616470c9d..ff9a4f83a 100644 --- a/control-plane/csi-driver/src/bin/controller/controller.rs +++ b/control-plane/csi-driver/src/bin/controller/controller.rs @@ -20,6 +20,7 @@ use utils::{dsp_created_by_key, DSP_OPERATOR}; use regex::Regex; use std::{collections::HashMap, str::FromStr}; +use stor_port::types::v0::openapi::models::AppNode; use tonic::{Code, Request, Response, Status}; use tracing::{debug, error, instrument, trace, warn}; use uuid::Uuid; @@ -33,6 +34,7 @@ const SNAPSHOT_NAME_PATTERN: &str = #[derive(Debug)] pub(crate) struct CsiControllerSvc { create_volume_limiter: std::sync::Arc, + disable_stale_cleanup: bool, } impl CsiControllerSvc { pub(crate) fn new(cfg: &CsiControllerConfig) -> Self { @@ -40,6 +42,7 @@ impl CsiControllerSvc { create_volume_limiter: std::sync::Arc::new(tokio::sync::Semaphore::new( cfg.create_volume_limit(), )), + disable_stale_cleanup: cfg.disable_stale_cleanup(), } } async fn create_volume_permit(&self) -> Result { @@ -136,9 +139,14 @@ async fn issue_fs_unfreeze(endpoint: String, volume_id: String) -> Result<(), St } #[tracing::instrument] -async fn issue_cleanup(endpoint: String, volume_id: String) -> Result<(), Status> { - tracing::info!("Issuing cleanup of stale entries before publish for volume: {volume_id}"); - let mut client = NodePluginClient::connect(format!("http://{endpoint}")) +async fn issue_cleanup(app_node: AppNode, volume_id: String) -> Result<(), Status> { + tracing::info!( + "Issuing cleanup for volume: {} to node {}, to endpoint {}", + volume_id, + app_node.id, + app_node.spec.endpoint + ); + let mut client = NodePluginClient::connect(format!("http://{}", app_node.spec.endpoint)) .await .map_err(|error| Status::failed_precondition(error.to_string()))?; client @@ -534,13 +542,12 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { error!("{}", m); return Err(Status::internal(m)); } - }, + } _ => { - // Check for node being cordoned. fn cordon_check(spec: Option<&NodeSpec>) -> bool { if let Some(spec) = spec { - return spec.cordondrainstate.is_some() + return spec.cordondrainstate.is_some(); } false } @@ -554,17 +561,18 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { // then let the control-plane decide where to place the target. Node should not be cordoned. Ok(node) if node.state.as_ref().map(|n| n.status).unwrap_or(NodeStatus::Unknown) != NodeStatus::Online || cordon_check(node.spec.as_ref()) => { Ok(None) - }, + } // For 1-replica volumes, don't pre-select the target node. This will allow the // control-plane to pin the target to the replica node. Ok(_) if volume.spec.num_replicas == 1 => Ok(None), Ok(_) => Ok(Some(node_id.as_str())), }?; - // Issue a cleanup rpc to csi node to ensure the subsystem doesn't have any path present before publishing - let app_node = RestApiClient::get_client().get_app_node(&args.node_id).await?; - tracing::info!("Issuing clean up to node {}, to endpoint {}", app_node.id, app_node.spec.endpoint); - issue_cleanup(app_node.spec.endpoint, volume_id.to_string()).await?; + if self.disable_stale_cleanup { + // Issue a cleanup rpc to csi node to ensure the subsystem doesn't have any path present before publishing + let app_node = RestApiClient::get_client().get_app_node(&args.node_id).await?; + issue_cleanup(app_node, volume_id.to_string()).await?; + } // Volume is not published. let v = RestApiClient::get_client() @@ -820,7 +828,8 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { })) } - #[instrument(err, fields(volume.uuid = request.get_ref().source_volume_id, snapshot.source_uuid = request.get_ref().source_volume_id, snapshot.uuid), skip(self))] + #[instrument(err, fields(volume.uuid = request.get_ref().source_volume_id, snapshot.source_uuid = request.get_ref().source_volume_id, snapshot.uuid + ), skip(self))] async fn create_snapshot( &self, request: tonic::Request, @@ -960,7 +969,8 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { Ok(Response::new(DeleteSnapshotResponse {})) } - #[instrument(err, fields(volume.uuid = request.get_ref().source_volume_id, snapshot.source_uuid = request.get_ref().source_volume_id, snapshot.uuid), skip(self))] + #[instrument(err, fields(volume.uuid = request.get_ref().source_volume_id, snapshot.source_uuid = request.get_ref().source_volume_id, snapshot.uuid + ), skip(self))] async fn list_snapshots( &self, request: tonic::Request, diff --git a/control-plane/csi-driver/src/bin/controller/main.rs b/control-plane/csi-driver/src/bin/controller/main.rs index eaa4fc291..0b3c8a0d8 100644 --- a/control-plane/csi-driver/src/bin/controller/main.rs +++ b/control-plane/csi-driver/src/bin/controller/main.rs @@ -111,12 +111,19 @@ async fn main() -> anyhow::Result<()> { .help("Formatting style to be used while logging") ) .arg( - Arg::new("ansi-colors") - .long("ansi-colors") - .default_value("true") - .value_parser(clap::value_parser!(bool)) - .help("Enable ansi color for logs") - ) + Arg::new("ansi-colors") + .long("ansi-colors") + .default_value("true") + .value_parser(clap::value_parser!(bool)) + .help("Enable ansi color for logs") + ) + .arg( + Arg::new("disable-stale-cleanup") + .long("disable-stale-cleanup") + .action(clap::ArgAction::SetFalse) + .value_name("BOOLEAN") + .help("Enable cleanup of the stale entries before controller publish") + ) .get_matches(); utils::print_package_info!(); diff --git a/control-plane/csi-driver/src/bin/node/main_.rs b/control-plane/csi-driver/src/bin/node/main_.rs index c85874358..a0544e09e 100644 --- a/control-plane/csi-driver/src/bin/node/main_.rs +++ b/control-plane/csi-driver/src/bin/node/main_.rs @@ -29,7 +29,7 @@ use std::{ use tokio::net::UnixListener; use tokio_stream::wrappers::UnixListenerStream; use tonic::transport::Server; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info}; const GRPC_PORT: u16 = 50051; @@ -152,13 +152,6 @@ pub(super) async fn main() -> anyhow::Result<()> { .value_parser(clap::value_parser!(bool)) .help("Enable ansi color for logs") ) - .arg( - Arg::new("stale-entry-cleanup") - .long("stale-entry-cleanup") - .action(clap::ArgAction::SetTrue) - .value_name("BOOLEAN") - .help("Enable cleanup of the stale entries before controller publish") - ) .subcommand( clap::Command::new("fs-freeze") .arg( @@ -285,11 +278,6 @@ pub(super) async fn main() -> anyhow::Result<()> { let registration_enabled = matches.get_flag("enable-registration"); - let stale_entry_cleanup = matches.get_flag("stale-entry-cleanup"); - if !stale_entry_cleanup { - warn!("Stale entry cleanup is disabled!") - } - // Parse instance and grpc endpoints from the command line arguments and validate. let grpc_sock_addr = validate_endpoints( matches.get_one::("grpc-endpoint").unwrap(), @@ -301,7 +289,7 @@ pub(super) async fn main() -> anyhow::Result<()> { *crate::config::config().nvme_as_mut() = TryFrom::try_from(&matches)?; let (csi, grpc, registration) = tokio::join!( CsiServer::run(csi_socket, &matches)?, - NodePluginGrpcServer::run(grpc_sock_addr, stale_entry_cleanup), + NodePluginGrpcServer::run(grpc_sock_addr), run_registration_loop( node_name.clone(), grpc_sock_addr.to_string(), diff --git a/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs b/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs index 16cdf27dd..3c2ce1948 100644 --- a/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs +++ b/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs @@ -28,9 +28,7 @@ use tracing::{debug, error, info}; use uuid::Uuid; #[derive(Debug, Default)] -pub(crate) struct NodePluginSvc { - stale_entry_cleanup: bool, -} +pub(crate) struct NodePluginSvc {} #[tonic::async_trait] impl NodePlugin for NodePluginSvc { @@ -70,9 +68,6 @@ impl NodePlugin for NodePluginSvc { &self, request: Request, ) -> Result, Status> { - if !self.stale_entry_cleanup { - return Ok(Response::new(CleanupReply {})); - } let volume_id = request.into_inner().volume_id; debug!("Starting cleanup for volume: {volume_id}"); @@ -80,10 +75,7 @@ impl NodePlugin for NodePluginSvc { match Subsystem::try_from_nqn(&nqn) { Ok(subsystem_paths) => { for subsystem_path in subsystem_paths { - debug!( - "Processing subsystem path: addr: {:?}, transport: {}, state: {}", - subsystem_path.address, subsystem_path.transport, subsystem_path.state - ); + debug!("Processing subsystem: {subsystem_path:?}"); if !subsystem_path.state.contains("deleting") { runtime::spawn_blocking(move || { subsystem_path @@ -148,18 +140,13 @@ pub(crate) struct NodePluginGrpcServer {} impl NodePluginGrpcServer { /// Run `Self` as a tonic server. - pub(crate) async fn run( - endpoint: std::net::SocketAddr, - stale_entry_cleanup: bool, - ) -> anyhow::Result<()> { + pub(crate) async fn run(endpoint: std::net::SocketAddr) -> anyhow::Result<()> { info!( "node plugin gRPC server configured at address {:?}", endpoint ); Server::builder() - .add_service(NodePluginServer::new(NodePluginSvc { - stale_entry_cleanup, - })) + .add_service(NodePluginServer::new(NodePluginSvc {})) .serve_with_shutdown(endpoint, Shutdown::wait()) .await .map_err(|error| {