From 827986c34d95d09bbd77bb5d1e676228d420a479 Mon Sep 17 00:00:00 2001 From: Abhinandan Purkait Date: Wed, 11 Dec 2024 10:46:33 +0000 Subject: [PATCH] feat(csi-driver): add pre-publish hook to cleanup stale entries Signed-off-by: Abhinandan Purkait --- .../csi-driver/proto/node-service.proto | 10 ++ .../src/bin/controller/controller.rs | 82 ++++++++-- .../csi-driver/src/bin/controller/main.rs | 12 +- .../csi-driver/src/bin/node/findmnt.rs | 145 +++++++++++++++++- .../csi-driver/src/bin/node/mount.rs | 26 +++- control-plane/csi-driver/src/bin/node/node.rs | 2 +- .../src/bin/node/nodeplugin_grpc.rs | 87 ++++++++++- .../csi-driver/src/bin/node/shutdown_event.rs | 11 +- 8 files changed, 346 insertions(+), 29 deletions(-) diff --git a/control-plane/csi-driver/proto/node-service.proto b/control-plane/csi-driver/proto/node-service.proto index 7cb2b6390..8e2c5b450 100644 --- a/control-plane/csi-driver/proto/node-service.proto +++ b/control-plane/csi-driver/proto/node-service.proto @@ -20,6 +20,8 @@ service NodePlugin { rpc UnfreezeFS (UnfreezeFSRequest) returns (UnfreezeFSReply) {} // Find the volume identified by the volume ID, and return volume information. rpc FindVolume (FindVolumeRequest) returns (FindVolumeReply) {} + // Force unstage the volume. + rpc ForceUnstageVolume (ForceUnstageVolumeRequest) returns (ForceUnstageVolumeReply) {} } enum VolumeType { @@ -54,3 +56,11 @@ message FindVolumeReply { optional VolumeType volume_type = 1; string device_path = 2; // the device path for the volume } + +// The request message containing ID of the volume to be force unstaged. +message ForceUnstageVolumeRequest { + string volume_id = 1; +} +// Message for response on volume force unstage. +message ForceUnstageVolumeReply { +} diff --git a/control-plane/csi-driver/src/bin/controller/controller.rs b/control-plane/csi-driver/src/bin/controller/controller.rs index 6ac68a8ec..cf40e65da 100644 --- a/control-plane/csi-driver/src/bin/controller/controller.rs +++ b/control-plane/csi-driver/src/bin/controller/controller.rs @@ -4,7 +4,10 @@ use crate::{ }; use csi_driver::{ context::{CreateParams, CreateSnapshotParams, PublishParams, QuiesceFsCandidate}, - node::internal::{node_plugin_client::NodePluginClient, FreezeFsRequest, UnfreezeFsRequest}, + node::internal::{ + node_plugin_client::NodePluginClient, ForceUnstageVolumeRequest, FreezeFsRequest, + UnfreezeFsRequest, + }, }; use rpc::csi::{volume_content_source::Type, Topology as CsiTopology, *}; use stor_port::types::v0::openapi::{ @@ -14,11 +17,12 @@ use stor_port::types::v0::openapi::{ SpecStatus, Volume, VolumeShareProtocol, }, }; -use utils::{dsp_created_by_key, DSP_OPERATOR}; +use utils::{dsp_created_by_key, DEFAULT_REQ_TIMEOUT, DSP_OPERATOR}; use regex::Regex; -use std::{collections::HashMap, str::FromStr}; -use tonic::{Code, Request, Response, Status}; +use std::{collections::HashMap, str::FromStr, time::Duration}; +use stor_port::types::v0::openapi::models::AppNode; +use tonic::{transport::Uri, Code, Request, Response, Status}; use tracing::{debug, error, instrument, trace, warn}; use uuid::Uuid; use volume_capability::AccessType; @@ -91,12 +95,30 @@ fn volume_app_node(volume: &Volume) -> Option { } } -#[tracing::instrument] +/// Create a new endpoint that connects to the provided Uri. +/// This endpoint has default connect and request timeouts. +fn tonic_endpoint(endpoint: String) -> Result { + let uri = + Uri::try_from(endpoint).map_err(|error| Status::invalid_argument(error.to_string()))?; + + let timeout = humantime::parse_duration(DEFAULT_REQ_TIMEOUT).unwrap(); + Ok(tonic::transport::Endpoint::from(uri) + .connect_timeout(timeout) + .timeout(std::time::Duration::from_secs(30)) + .http2_keep_alive_interval(Duration::from_secs(5)) + .keep_alive_timeout(Duration::from_secs(10)) + .concurrency_limit(utils::DEFAULT_GRPC_CLIENT_CONCURRENCY)) +} + +#[tracing::instrument(err, skip_all)] async fn issue_fs_freeze(endpoint: String, volume_id: String) -> Result<(), Status> { trace!("Issuing fs freeze"); - let mut client = NodePluginClient::connect(format!("http://{endpoint}")) + let channel = tonic_endpoint(format!("http://{endpoint}"))? + .connect() .await - .map_err(|error| Status::failed_precondition(error.to_string()))?; + .map_err(|error| Status::unavailable(error.to_string()))?; + let mut client = NodePluginClient::new(channel); + match client .freeze_fs(Request::new(FreezeFsRequest { volume_id: volume_id.clone(), @@ -112,12 +134,15 @@ async fn issue_fs_freeze(endpoint: String, volume_id: String) -> Result<(), Stat } } -#[tracing::instrument] +#[tracing::instrument(err, skip_all)] async fn issue_fs_unfreeze(endpoint: String, volume_id: String) -> Result<(), Status> { trace!("Issuing fs unfreeze"); - let mut client = NodePluginClient::connect(format!("http://{endpoint}")) + let channel = tonic_endpoint(format!("http://{endpoint}"))? + .connect() .await - .map_err(|error| Status::failed_precondition(error.to_string()))?; + .map_err(|error| Status::unavailable(error.to_string()))?; + let mut client = NodePluginClient::new(channel); + match client .unfreeze_fs(Request::new(UnfreezeFsRequest { volume_id: volume_id.clone(), @@ -133,6 +158,28 @@ async fn issue_fs_unfreeze(endpoint: String, volume_id: String) -> Result<(), St } } +#[tracing::instrument(err, skip_all)] +async fn force_unstage(app_node: AppNode, volume_id: String) -> Result<(), Status> { + tracing::info!( + "Issuing cleanup for volume: {} to node {}, to endpoint {}", + volume_id, + app_node.id, + app_node.spec.endpoint + ); + let channel = tonic_endpoint(format!("http://{}", app_node.spec.endpoint))? + .connect() + .await + .map_err(|error| Status::unavailable(error.to_string()))?; + let mut client = NodePluginClient::new(channel); + + client + .force_unstage_volume(Request::new(ForceUnstageVolumeRequest { + volume_id: volume_id.clone(), + })) + .await + .map(|_| ()) +} + /// Get share URI for existing volume object and the node where the volume is published. fn get_volume_share_location(volume: &Volume) -> Option<(String, String)> { volume @@ -518,13 +565,12 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { error!("{}", m); return Err(Status::internal(m)); } - }, + } _ => { - // Check for node being cordoned. fn cordon_check(spec: Option<&NodeSpec>) -> bool { if let Some(spec) = spec { - return spec.cordondrainstate.is_some() + return spec.cordondrainstate.is_some(); } false } @@ -538,13 +584,21 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { // then let the control-plane decide where to place the target. Node should not be cordoned. Ok(node) if node.state.as_ref().map(|n| n.status).unwrap_or(NodeStatus::Unknown) != NodeStatus::Online || cordon_check(node.spec.as_ref()) => { Ok(None) - }, + } // For 1-replica volumes, don't pre-select the target node. This will allow the // control-plane to pin the target to the replica node. Ok(_) if volume.spec.num_replicas == 1 => Ok(None), Ok(_) => Ok(Some(node_id.as_str())), }?; + // Issue a cleanup rpc to csi node to ensure the subsystem doesn't have any path present before publishing + match RestApiClient::get_client().get_app_node(&args.node_id).await { + Ok(app_node) => force_unstage(app_node, volume_id.to_string()).await?, + Err(ApiClientError::ResourceNotExists(..)) => warn!("App node: {}, not found, skipping force unstage volume", args.node_id), + Err(error) => return Err(error.into()) + } + + // Volume is not published. let v = RestApiClient::get_client() .publish_volume(&volume_id, target_node, protocol, args.node_id.clone(), &publish_context) diff --git a/control-plane/csi-driver/src/bin/controller/main.rs b/control-plane/csi-driver/src/bin/controller/main.rs index eaa4fc291..931cbf5d8 100644 --- a/control-plane/csi-driver/src/bin/controller/main.rs +++ b/control-plane/csi-driver/src/bin/controller/main.rs @@ -111,12 +111,12 @@ async fn main() -> anyhow::Result<()> { .help("Formatting style to be used while logging") ) .arg( - Arg::new("ansi-colors") - .long("ansi-colors") - .default_value("true") - .value_parser(clap::value_parser!(bool)) - .help("Enable ansi color for logs") - ) + Arg::new("ansi-colors") + .long("ansi-colors") + .default_value("true") + .value_parser(clap::value_parser!(bool)) + .help("Enable ansi color for logs") + ) .get_matches(); utils::print_package_info!(); diff --git a/control-plane/csi-driver/src/bin/node/findmnt.rs b/control-plane/csi-driver/src/bin/node/findmnt.rs index 70837ac1e..729973536 100644 --- a/control-plane/csi-driver/src/bin/node/findmnt.rs +++ b/control-plane/csi-driver/src/bin/node/findmnt.rs @@ -4,6 +4,7 @@ use csi_driver::filesystem::FileSystem as Fs; use serde_json::Value; use std::{collections::HashMap, process::Command, str::FromStr, string::String, vec::Vec}; use tracing::{error, warn}; +use utils::csi_plugin_name; // Keys of interest we expect to find in the JSON output generated // by findmnt. @@ -13,11 +14,20 @@ const FSTYPE_KEY: &str = "fstype"; #[derive(Debug)] pub(crate) struct DeviceMount { - #[allow(dead_code)] mount_path: String, fstype: FileSystem, } +impl std::fmt::Display for DeviceMount { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "MountPath: {}, FileSystem: {}", + self.mount_path, self.fstype + ) + } +} + impl DeviceMount { /// create new DeviceMount pub(crate) fn new(mount_path: String, fstype: FileSystem) -> DeviceMount { @@ -27,6 +37,10 @@ impl DeviceMount { pub(crate) fn fstype(&self) -> FileSystem { self.fstype.clone() } + /// Get the mount path. + pub(crate) fn mount_path(&self) -> String { + self.mount_path.clone() + } } #[derive(Debug)] @@ -213,3 +227,132 @@ pub(crate) fn get_mountpaths(device_path: &str) -> Result, Devi Err(e) => Err(e), } } + +const DRIVER_NAME: &str = "driverName"; +const VOLUME_HANDLE: &str = "volumeHandle"; +const METADATA_FILE: &str = "vol_data.json"; +const DEVICE_PATTERN: &str = "nvme"; + +/// This filter is to be used specifically search for mountpoints in context of k8s +/// +/// # Fields +/// * `driver` - Name of the csi driver which provisioned the volume. +/// * `volume_id` - Name of the volume which is being searched for. +/// * `file_name` - Name of the file where kubelet stores the metadata. +/// * `device_pattern` - Pattern to search for a specific type of device, ex nvme. +#[derive(Debug)] +struct FilterCsiMounts<'a> { + driver: &'a str, + volume_id: &'a str, + file_name: &'a str, + device_pattern: &'a str, +} + +/// Finds CSI mount points using `findmnt` and filters based on the given criteria. +fn find_csi_mount(filter: FilterCsiMounts) -> Result>, DeviceError> { + let output = Command::new(FIND_MNT).args(FIND_MNT_ARGS).output()?; + + if !output.status.success() { + return Err(DeviceError::new(String::from_utf8(output.stderr)?.as_str())); + } + + let json_str = String::from_utf8(output.stdout)?; + let json: Value = serde_json::from_str(&json_str)?; + + let mut results: Vec> = Vec::new(); + filter_csi_findmnt(&json, &filter, &mut results); + + Ok(results) +} + +/// Filters JSON output from `findmnt` for matching CSI mount points as per the filter. +fn filter_csi_findmnt( + json_val: &Value, + filter: &FilterCsiMounts, + results: &mut Vec>, +) { + match json_val { + Value::Array(json_array) => { + for jsonvalue in json_array { + filter_csi_findmnt(jsonvalue, filter, results); + } + } + Value::Object(json_map) => { + if let Some(source_value) = json_map.get(SOURCE_KEY) { + let source_str = key_adjusted_value(SOURCE_KEY, source_value); + if source_str.contains(filter.device_pattern) { + if let Some(target_value) = json_map.get(TARGET_KEY) { + let target_str = target_value.as_str().unwrap_or_default(); + + if let Some(parent_path) = std::path::Path::new(target_str).parent() { + let vol_data_path = parent_path.join(filter.file_name); + let vol_data_path_str = vol_data_path.to_string_lossy(); + + if let Ok(vol_data) = read_vol_data_json(&vol_data_path_str) { + if vol_data.get(DRIVER_NAME) + == Some(&Value::String(filter.driver.to_string())) + && vol_data.get(VOLUME_HANDLE) + == Some(&Value::String(filter.volume_id.to_string())) + { + results.push(jsonmap_to_hashmap(json_map)); + } + } + } + } + } + } + + for (_, jsonvalue) in json_map { + if jsonvalue.is_array() || jsonvalue.is_object() { + filter_csi_findmnt(jsonvalue, filter, results); + } + } + } + _ => (), + }; +} + +/// Reads and parses a file into a JSON object. +fn read_vol_data_json(path: &str) -> Result, DeviceError> { + let file = std::fs::File::open(path)?; + let reader = std::io::BufReader::new(file); + let json: serde_json::Map = serde_json::from_reader(reader)?; + Ok(json) +} + +/// Retrieves mount paths for a given CSI volume ID by parsing the metadata file. +pub(crate) async fn get_csi_mountpaths(volume_id: &str) -> Result, DeviceError> { + let filter = FilterCsiMounts { + driver: &csi_plugin_name(), + volume_id, + file_name: METADATA_FILE, + device_pattern: DEVICE_PATTERN, + }; + match find_csi_mount(filter) { + Ok(results) => { + let mut mountpaths: Vec = Vec::new(); + for entry in results { + if let Some(mountpath) = entry.get(TARGET_KEY) { + if let Some(fstype) = entry.get(FSTYPE_KEY) { + mountpaths.push(DeviceMount::new( + mountpath.to_string(), + Fs::from_str(fstype) + .unwrap_or(Fs::Unsupported(fstype.to_string())) + .into(), + )) + } else { + warn!(volume.id=%volume_id, "Missing fstype for {mountpath}"); + mountpaths.push(DeviceMount::new( + mountpath.to_string(), + Fs::Unsupported("".to_string()).into(), + )) + } + } else { + warn!(volume.id=%volume_id, ?entry, "Missing target field"); + } + } + Ok(mountpaths) + } + Err(e) => Err(e), + } +} diff --git a/control-plane/csi-driver/src/bin/node/mount.rs b/control-plane/csi-driver/src/bin/node/mount.rs index d33d58ee8..952ba7f7f 100644 --- a/control-plane/csi-driver/src/bin/node/mount.rs +++ b/control-plane/csi-driver/src/bin/node/mount.rs @@ -1,10 +1,11 @@ //! Utility functions for mounting and unmounting filesystems. -use crate::{filesystem_ops::FileSystem, runtime}; +use crate::{filesystem_ops::FileSystem, findmnt::DeviceMount, runtime}; use csi_driver::filesystem::FileSystem as Fs; use devinfo::mountinfo::{MountInfo, SafeMountIter}; use std::{collections::HashSet, io::Error}; use sys_mount::{unmount, FilesystemType, Mount, MountFlags, UnmountFlags}; +use tonic::Status; use tracing::{debug, info}; use uuid::Uuid; @@ -425,3 +426,26 @@ pub(crate) async fn unmount_on_fs_id_diff( ) }) } + +pub(crate) async fn lazy_unmount_mountpaths(mountpaths: &Vec) -> Result<(), Status> { + for mountpath in mountpaths { + debug!( + "Unmounting path: {}, with DETACH flag", + mountpath.mount_path() + ); + let target_path = mountpath.mount_path().to_string(); + + runtime::spawn_blocking({ + let target_path = target_path.clone(); + move || { + let mut unmount_flags = UnmountFlags::empty(); + unmount_flags.insert(UnmountFlags::DETACH); + sys_mount::unmount(target_path, unmount_flags) + .map_err(|error| Status::aborted(error.to_string())) + } + }) + .await + .map_err(|error| Status::aborted(error.to_string()))??; + } + Ok(()) +} diff --git a/control-plane/csi-driver/src/bin/node/node.rs b/control-plane/csi-driver/src/bin/node/node.rs index 6ba80692b..3bb567d89 100644 --- a/control-plane/csi-driver/src/bin/node/node.rs +++ b/control-plane/csi-driver/src/bin/node/node.rs @@ -164,7 +164,7 @@ fn get_access_type(volume_capability: &Option) -> Result<&Acce /// Detach the nexus device from the system, either at volume unstage, /// or after failed filesystem mount at volume stage. -async fn detach(uuid: &Uuid, errheader: String) -> Result<(), Status> { +pub(crate) async fn detach(uuid: &Uuid, errheader: String) -> Result<(), Status> { if let Some(device) = Device::lookup(uuid).await.map_err(|error| { failure!( Code::Internal, diff --git a/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs b/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs index eeb894409..83cb93fb2 100644 --- a/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs +++ b/control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs @@ -3,26 +3,39 @@ //! node as a IoEngine CSI node plugin, but it is not possible to do so within //! the CSI framework. This service must be deployed on all nodes the //! IoEngine CSI node plugin is deployed. + use crate::{ + dev::Device, + findmnt, fsfreeze::{fsfreeze, FsFreezeOpt}, + mount::lazy_unmount_mountpaths, nodeplugin_svc, nodeplugin_svc::{find_mount, lookup_device}, + runtime, shutdown_event::Shutdown, }; -use csi_driver::node::internal::{ - node_plugin_server::{NodePlugin, NodePluginServer}, - FindVolumeReply, FindVolumeRequest, FreezeFsReply, FreezeFsRequest, UnfreezeFsReply, - UnfreezeFsRequest, VolumeType, +use csi_driver::{ + limiter::VolumeOpGuard, + node::internal::{ + node_plugin_server::{NodePlugin, NodePluginServer}, + FindVolumeReply, FindVolumeRequest, ForceUnstageVolumeReply, ForceUnstageVolumeRequest, + FreezeFsReply, FreezeFsRequest, UnfreezeFsReply, UnfreezeFsRequest, VolumeType, + }, }; use nodeplugin_svc::TypeOfMount; +use nvmeadm::{error::NvmeError, nvmf_subsystem::Subsystem}; +use utils::nvme_target_nqn_prefix; + use tonic::{transport::Server, Request, Response, Status}; use tracing::{debug, error, info}; +use uuid::Uuid; #[derive(Debug, Default)] pub(crate) struct NodePluginSvc {} #[tonic::async_trait] impl NodePlugin for NodePluginSvc { + #[tracing::instrument(err, skip_all)] async fn freeze_fs( &self, request: Request, @@ -32,6 +45,7 @@ impl NodePlugin for NodePluginSvc { Ok(Response::new(FreezeFsReply {})) } + #[tracing::instrument(err, skip_all)] async fn unfreeze_fs( &self, request: Request, @@ -54,6 +68,71 @@ impl NodePlugin for NodePluginSvc { device_path: device.devname(), })) } + + #[tracing::instrument(err, fields(volume.uuid = request.get_ref().volume_id), skip(self, request))] + async fn force_unstage_volume( + &self, + request: Request, + ) -> Result, Status> { + let volume_id = request.into_inner().volume_id; + + let _guard = VolumeOpGuard::new_str(&volume_id)?; + + debug!("Starting cleanup for volume: {volume_id}"); + + let nqn = format!("{}:{volume_id}", nvme_target_nqn_prefix()); + match Subsystem::try_from_nqn(&nqn) { + Ok(subsystem_paths) => { + for subsystem_path in subsystem_paths { + info!("Processing subsystem: {subsystem_path:?}"); + if !subsystem_path.state.contains("deleting") { + runtime::spawn_blocking(move || { + subsystem_path + .disconnect() + .map_err(|error| Status::aborted(error.to_string())) + }) + .await + .map_err(|error| Status::aborted(error.to_string()))??; + } + } + Err(Status::internal(format!( + "Cleanup initiated for all stale entries of: {volume_id}. Returning error for validation on retry." + ))) + } + Err(NvmeError::NqnNotFound { .. }) => { + let uuid = Uuid::parse_str(&volume_id).map_err(|error| { + Status::invalid_argument(format!("Invalid volume UUID: {volume_id}, {error}")) + })?; + + if let Ok(Some(device)) = Device::lookup(&uuid).await { + let mountpaths = findmnt::get_mountpaths(&device.devname())?; + debug!( + "Device: {} found, with mount paths: {}, issuing unmount", + device.devname(), + mountpaths + .iter() + .map(|devmount| devmount.to_string()) + .collect::>() + .join(", ") + ); + lazy_unmount_mountpaths(&mountpaths).await?; + } else { + let mountpaths = findmnt::get_csi_mountpaths(&volume_id).await?; + debug!( + "Device was not found, detected mount paths: {}, issuing unmount", + mountpaths + .iter() + .map(|devmount| devmount.to_string()) + .collect::>() + .join(", ") + ); + lazy_unmount_mountpaths(&mountpaths).await?; + } + Ok(Response::new(ForceUnstageVolumeReply {})) + } + Err(error) => Err(Status::aborted(error.to_string())), + } + } } impl From for VolumeType { diff --git a/control-plane/csi-driver/src/bin/node/shutdown_event.rs b/control-plane/csi-driver/src/bin/node/shutdown_event.rs index 5ca9a9e64..80e06c0dc 100644 --- a/control-plane/csi-driver/src/bin/node/shutdown_event.rs +++ b/control-plane/csi-driver/src/bin/node/shutdown_event.rs @@ -13,8 +13,8 @@ mod tests { use csi_driver::node::internal::{ node_plugin_client::NodePluginClient, node_plugin_server::{NodePlugin, NodePluginServer}, - FindVolumeReply, FindVolumeRequest, FreezeFsReply, FreezeFsRequest, UnfreezeFsReply, - UnfreezeFsRequest, VolumeType, + FindVolumeReply, FindVolumeRequest, ForceUnstageVolumeReply, ForceUnstageVolumeRequest, + FreezeFsReply, FreezeFsRequest, UnfreezeFsReply, UnfreezeFsRequest, VolumeType, }; use std::{ str::FromStr, @@ -73,6 +73,13 @@ mod tests { device_path: "".to_string(), })) } + + async fn force_unstage_volume( + &self, + _request: Request, + ) -> Result, Status> { + unimplemented!() + } } /// Tests the shutdown of a tonic service.