Skip to content

Commit

Permalink
chore: move feature-gate to controller, tidy up logs
Browse files Browse the repository at this point in the history
Signed-off-by: Abhinandan Purkait <[email protected]>
  • Loading branch information
Abhinandan-Purkait committed Dec 12, 2024
1 parent 212c8c2 commit f1d7cd4
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 50 deletions.
12 changes: 12 additions & 0 deletions control-plane/csi-driver/src/bin/controller/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub(crate) struct CsiControllerConfig {
node_selector: HashMap<String, String>,
/// Max Outstanding Create Volume Requests.
create_volume_limit: usize,
/// Disable Stale Cleanup.
disable_stale_cleanup: bool,
}

impl CsiControllerConfig {
Expand Down Expand Up @@ -43,11 +45,17 @@ impl CsiControllerConfig {
.map(|s| s.map(|s| s.as_str())),
)?;

let disable_stale_cleanup = args.get_flag("disable_stale_cleanup");
if !disable_stale_cleanup {
tracing::warn!("Stale entry cleanup is disabled!")
}

CONFIG.get_or_init(|| Self {
rest_endpoint: rest_endpoint.into(),
io_timeout: io_timeout.into(),
node_selector,
create_volume_limit,
disable_stale_cleanup,
});
Ok(())
}
Expand Down Expand Up @@ -78,4 +86,8 @@ impl CsiControllerConfig {
pub(crate) fn node_selector_segment(&self) -> HashMap<String, String> {
self.node_selector.clone()
}
/// Disable Stale Cleanup.
pub(crate) fn disable_stale_cleanup(&self) -> bool {
self.disable_stale_cleanup
}
}
36 changes: 23 additions & 13 deletions control-plane/csi-driver/src/bin/controller/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use utils::{dsp_created_by_key, DSP_OPERATOR};

use regex::Regex;
use std::{collections::HashMap, str::FromStr};
use stor_port::types::v0::openapi::models::AppNode;
use tonic::{Code, Request, Response, Status};
use tracing::{debug, error, instrument, trace, warn};
use uuid::Uuid;
Expand All @@ -33,13 +34,15 @@ const SNAPSHOT_NAME_PATTERN: &str =
#[derive(Debug)]
pub(crate) struct CsiControllerSvc {
create_volume_limiter: std::sync::Arc<tokio::sync::Semaphore>,
disable_stale_cleanup: bool,
}
impl CsiControllerSvc {
pub(crate) fn new(cfg: &CsiControllerConfig) -> Self {
Self {
create_volume_limiter: std::sync::Arc::new(tokio::sync::Semaphore::new(
cfg.create_volume_limit(),
)),
disable_stale_cleanup: cfg.disable_stale_cleanup(),
}
}
async fn create_volume_permit(&self) -> Result<tokio::sync::SemaphorePermit, tonic::Status> {
Expand Down Expand Up @@ -136,9 +139,14 @@ async fn issue_fs_unfreeze(endpoint: String, volume_id: String) -> Result<(), St
}

#[tracing::instrument]
async fn issue_cleanup(endpoint: String, volume_id: String) -> Result<(), Status> {
tracing::info!("Issuing cleanup of stale entries before publish for volume: {volume_id}");
let mut client = NodePluginClient::connect(format!("http://{endpoint}"))
async fn issue_cleanup(app_node: AppNode, volume_id: String) -> Result<(), Status> {
tracing::info!(
"Issuing cleanup for volume: {} to node {}, to endpoint {}",
volume_id,
app_node.id,
app_node.spec.endpoint
);
let mut client = NodePluginClient::connect(format!("http://{}", app_node.spec.endpoint))
.await
.map_err(|error| Status::failed_precondition(error.to_string()))?;
client
Expand Down Expand Up @@ -534,13 +542,12 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
error!("{}", m);
return Err(Status::internal(m));
}
},
}
_ => {

// Check for node being cordoned.
fn cordon_check(spec: Option<&NodeSpec>) -> bool {
if let Some(spec) = spec {
return spec.cordondrainstate.is_some()
return spec.cordondrainstate.is_some();
}
false
}
Expand All @@ -554,17 +561,18 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
// then let the control-plane decide where to place the target. Node should not be cordoned.
Ok(node) if node.state.as_ref().map(|n| n.status).unwrap_or(NodeStatus::Unknown) != NodeStatus::Online || cordon_check(node.spec.as_ref()) => {
Ok(None)
},
}
// For 1-replica volumes, don't pre-select the target node. This will allow the
// control-plane to pin the target to the replica node.
Ok(_) if volume.spec.num_replicas == 1 => Ok(None),
Ok(_) => Ok(Some(node_id.as_str())),
}?;

// Issue a cleanup rpc to csi node to ensure the subsystem doesn't have any path present before publishing
let app_node = RestApiClient::get_client().get_app_node(&args.node_id).await?;
tracing::info!("Issuing clean up to node {}, to endpoint {}", app_node.id, app_node.spec.endpoint);
issue_cleanup(app_node.spec.endpoint, volume_id.to_string()).await?;
if self.disable_stale_cleanup {
// Issue a cleanup rpc to csi node to ensure the subsystem doesn't have any path present before publishing
let app_node = RestApiClient::get_client().get_app_node(&args.node_id).await?;
issue_cleanup(app_node, volume_id.to_string()).await?;
}

// Volume is not published.
let v = RestApiClient::get_client()
Expand Down Expand Up @@ -820,7 +828,8 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
}))
}

#[instrument(err, fields(volume.uuid = request.get_ref().source_volume_id, snapshot.source_uuid = request.get_ref().source_volume_id, snapshot.uuid), skip(self))]
#[instrument(err, fields(volume.uuid = request.get_ref().source_volume_id, snapshot.source_uuid = request.get_ref().source_volume_id, snapshot.uuid
), skip(self))]
async fn create_snapshot(
&self,
request: tonic::Request<CreateSnapshotRequest>,
Expand Down Expand Up @@ -960,7 +969,8 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
Ok(Response::new(DeleteSnapshotResponse {}))
}

#[instrument(err, fields(volume.uuid = request.get_ref().source_volume_id, snapshot.source_uuid = request.get_ref().source_volume_id, snapshot.uuid), skip(self))]
#[instrument(err, fields(volume.uuid = request.get_ref().source_volume_id, snapshot.source_uuid = request.get_ref().source_volume_id, snapshot.uuid
), skip(self))]
async fn list_snapshots(
&self,
request: tonic::Request<ListSnapshotsRequest>,
Expand Down
19 changes: 13 additions & 6 deletions control-plane/csi-driver/src/bin/controller/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,19 @@ async fn main() -> anyhow::Result<()> {
.help("Formatting style to be used while logging")
)
.arg(
Arg::new("ansi-colors")
.long("ansi-colors")
.default_value("true")
.value_parser(clap::value_parser!(bool))
.help("Enable ansi color for logs")
)
Arg::new("ansi-colors")
.long("ansi-colors")
.default_value("true")
.value_parser(clap::value_parser!(bool))
.help("Enable ansi color for logs")
)
.arg(
Arg::new("disable-stale-cleanup")
.long("disable-stale-cleanup")
.action(clap::ArgAction::SetFalse)
.value_name("BOOLEAN")
.help("Enable cleanup of the stale entries before controller publish")
)
.get_matches();

utils::print_package_info!();
Expand Down
16 changes: 2 additions & 14 deletions control-plane/csi-driver/src/bin/node/main_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::{
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::Server;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info};

const GRPC_PORT: u16 = 50051;

Expand Down Expand Up @@ -152,13 +152,6 @@ pub(super) async fn main() -> anyhow::Result<()> {
.value_parser(clap::value_parser!(bool))
.help("Enable ansi color for logs")
)
.arg(
Arg::new("stale-entry-cleanup")
.long("stale-entry-cleanup")
.action(clap::ArgAction::SetTrue)
.value_name("BOOLEAN")
.help("Enable cleanup of the stale entries before controller publish")
)
.subcommand(
clap::Command::new("fs-freeze")
.arg(
Expand Down Expand Up @@ -285,11 +278,6 @@ pub(super) async fn main() -> anyhow::Result<()> {

let registration_enabled = matches.get_flag("enable-registration");

let stale_entry_cleanup = matches.get_flag("stale-entry-cleanup");
if !stale_entry_cleanup {
warn!("Stale entry cleanup is disabled!")
}

// Parse instance and grpc endpoints from the command line arguments and validate.
let grpc_sock_addr = validate_endpoints(
matches.get_one::<String>("grpc-endpoint").unwrap(),
Expand All @@ -301,7 +289,7 @@ pub(super) async fn main() -> anyhow::Result<()> {
*crate::config::config().nvme_as_mut() = TryFrom::try_from(&matches)?;
let (csi, grpc, registration) = tokio::join!(
CsiServer::run(csi_socket, &matches)?,
NodePluginGrpcServer::run(grpc_sock_addr, stale_entry_cleanup),
NodePluginGrpcServer::run(grpc_sock_addr),
run_registration_loop(
node_name.clone(),
grpc_sock_addr.to_string(),
Expand Down
21 changes: 4 additions & 17 deletions control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ use tracing::{debug, error, info};
use uuid::Uuid;

#[derive(Debug, Default)]
pub(crate) struct NodePluginSvc {
stale_entry_cleanup: bool,
}
pub(crate) struct NodePluginSvc {}

#[tonic::async_trait]
impl NodePlugin for NodePluginSvc {
Expand Down Expand Up @@ -70,20 +68,14 @@ impl NodePlugin for NodePluginSvc {
&self,
request: Request<CleanupRequest>,
) -> Result<Response<CleanupReply>, Status> {
if !self.stale_entry_cleanup {
return Ok(Response::new(CleanupReply {}));
}
let volume_id = request.into_inner().volume_id;
debug!("Starting cleanup for volume: {volume_id}");

let nqn = format!("{}:{volume_id}", nvme_target_nqn_prefix());
match Subsystem::try_from_nqn(&nqn) {
Ok(subsystem_paths) => {
for subsystem_path in subsystem_paths {
debug!(
"Processing subsystem path: addr: {:?}, transport: {}, state: {}",
subsystem_path.address, subsystem_path.transport, subsystem_path.state
);
debug!("Processing subsystem: {subsystem_path:?}");
if !subsystem_path.state.contains("deleting") {
runtime::spawn_blocking(move || {
subsystem_path
Expand Down Expand Up @@ -148,18 +140,13 @@ pub(crate) struct NodePluginGrpcServer {}

impl NodePluginGrpcServer {
/// Run `Self` as a tonic server.
pub(crate) async fn run(
endpoint: std::net::SocketAddr,
stale_entry_cleanup: bool,
) -> anyhow::Result<()> {
pub(crate) async fn run(endpoint: std::net::SocketAddr) -> anyhow::Result<()> {
info!(
"node plugin gRPC server configured at address {:?}",
endpoint
);
Server::builder()
.add_service(NodePluginServer::new(NodePluginSvc {
stale_entry_cleanup,
}))
.add_service(NodePluginServer::new(NodePluginSvc {}))
.serve_with_shutdown(endpoint, Shutdown::wait())
.await
.map_err(|error| {
Expand Down

0 comments on commit f1d7cd4

Please sign in to comment.