Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(csi-driver): add pre-publish hook to cleanup stale entries #905

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions control-plane/csi-driver/proto/node-service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ service NodePlugin {
rpc UnfreezeFS (UnfreezeFSRequest) returns (UnfreezeFSReply) {}
// Find the volume identified by the volume ID, and return volume information.
rpc FindVolume (FindVolumeRequest) returns (FindVolumeReply) {}
// Force unstage the volume.
rpc ForceUnstageVolume (ForceUnstageVolumeRequest) returns (ForceUnstageVolumeReply) {}
}

enum VolumeType {
Expand Down Expand Up @@ -54,3 +56,11 @@ message FindVolumeReply {
optional VolumeType volume_type = 1;
string device_path = 2; // the device path for the volume
}

// The request message containing ID of the volume to be force unstaged.
message ForceUnstageVolumeRequest {
string volume_id = 1;
}
// Message for response on volume force unstage.
message ForceUnstageVolumeReply {
}
14 changes: 14 additions & 0 deletions control-plane/csi-driver/src/bin/controller/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub(crate) struct CsiControllerConfig {
node_selector: HashMap<String, String>,
/// Max Outstanding Create Volume Requests.
create_volume_limit: usize,
/// Force unstage volume.
force_unstage_volume: bool,
}

impl CsiControllerConfig {
Expand Down Expand Up @@ -43,11 +45,19 @@ impl CsiControllerConfig {
.map(|s| s.map(|s| s.as_str())),
)?;

let force_unstage_volume = args.get_flag("force-unstage-volume");
if !force_unstage_volume {
tracing::warn!(
"Force unstage volume is disabled, can trigger potential data corruption!"
);
}

CONFIG.get_or_init(|| Self {
rest_endpoint: rest_endpoint.into(),
io_timeout: io_timeout.into(),
node_selector,
create_volume_limit,
force_unstage_volume,
});
Ok(())
}
Expand Down Expand Up @@ -78,4 +88,8 @@ impl CsiControllerConfig {
pub(crate) fn node_selector_segment(&self) -> HashMap<String, String> {
self.node_selector.clone()
}
/// Force unstage volume.
pub(crate) fn force_unstage_volume(&self) -> bool {
self.force_unstage_volume
}
}
85 changes: 69 additions & 16 deletions control-plane/csi-driver/src/bin/controller/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,24 @@ use crate::{
};
use csi_driver::{
context::{CreateParams, CreateSnapshotParams, PublishParams, QuiesceFsCandidate},
node::internal::{node_plugin_client::NodePluginClient, FreezeFsRequest, UnfreezeFsRequest},
node::internal::{
node_plugin_client::NodePluginClient, ForceUnstageVolumeRequest, FreezeFsRequest,
UnfreezeFsRequest,
},
};
use rpc::csi::{volume_content_source::Type, Topology as CsiTopology, *};
use stor_port::types::v0::openapi::{
models,
models::{
AffinityGroup, LabelledTopology, NodeSpec, NodeStatus, Pool, PoolStatus, PoolTopology,
SpecStatus, Volume, VolumeShareProtocol,
AffinityGroup, AppNode, LabelledTopology, NodeSpec, NodeStatus, Pool, PoolStatus,
PoolTopology, SpecStatus, Volume, VolumeShareProtocol,
},
};
use utils::{dsp_created_by_key, DSP_OPERATOR};
use utils::{dsp_created_by_key, DEFAULT_REQ_TIMEOUT, DSP_OPERATOR};

use regex::Regex;
use std::{collections::HashMap, str::FromStr};
use tonic::{Code, Request, Response, Status};
use std::{collections::HashMap, str::FromStr, time::Duration};
use tonic::{transport::Uri, Code, Request, Response, Status};
use tracing::{debug, error, instrument, trace, warn};
use uuid::Uuid;
use volume_capability::AccessType;
Expand All @@ -31,13 +34,15 @@ const SNAPSHOT_NAME_PATTERN: &str =
#[derive(Debug)]
pub(crate) struct CsiControllerSvc {
create_volume_limiter: std::sync::Arc<tokio::sync::Semaphore>,
force_unstage_volume: bool,
}
impl CsiControllerSvc {
pub(crate) fn new(cfg: &CsiControllerConfig) -> Self {
Self {
create_volume_limiter: std::sync::Arc::new(tokio::sync::Semaphore::new(
cfg.create_volume_limit(),
)),
force_unstage_volume: cfg.force_unstage_volume(),
}
}
async fn create_volume_permit(&self) -> Result<tokio::sync::SemaphorePermit, tonic::Status> {
Expand Down Expand Up @@ -91,12 +96,30 @@ fn volume_app_node(volume: &Volume) -> Option<String> {
}
}

#[tracing::instrument]
/// Create a new endpoint that connects to the provided Uri.
/// This endpoint has default connect and request timeouts.
fn tonic_endpoint(endpoint: String) -> Result<tonic::transport::Endpoint, Status> {
let uri =
Uri::try_from(endpoint).map_err(|error| Status::invalid_argument(error.to_string()))?;

let timeout = humantime::parse_duration(DEFAULT_REQ_TIMEOUT).unwrap();
Ok(tonic::transport::Endpoint::from(uri)
.connect_timeout(timeout)
.timeout(std::time::Duration::from_secs(30))
.http2_keep_alive_interval(Duration::from_secs(5))
.keep_alive_timeout(Duration::from_secs(10))
.concurrency_limit(utils::DEFAULT_GRPC_CLIENT_CONCURRENCY))
}

#[tracing::instrument(err, skip_all)]
async fn issue_fs_freeze(endpoint: String, volume_id: String) -> Result<(), Status> {
trace!("Issuing fs freeze");
let mut client = NodePluginClient::connect(format!("http://{endpoint}"))
let channel = tonic_endpoint(format!("http://{endpoint}"))?
.connect()
.await
.map_err(|error| Status::failed_precondition(error.to_string()))?;
.map_err(|error| Status::unavailable(error.to_string()))?;
let mut client = NodePluginClient::new(channel);

match client
.freeze_fs(Request::new(FreezeFsRequest {
volume_id: volume_id.clone(),
Expand All @@ -112,12 +135,15 @@ async fn issue_fs_freeze(endpoint: String, volume_id: String) -> Result<(), Stat
}
}

#[tracing::instrument]
#[tracing::instrument(err, skip_all)]
async fn issue_fs_unfreeze(endpoint: String, volume_id: String) -> Result<(), Status> {
trace!("Issuing fs unfreeze");
let mut client = NodePluginClient::connect(format!("http://{endpoint}"))
let channel = tonic_endpoint(format!("http://{endpoint}"))?
.connect()
.await
.map_err(|error| Status::failed_precondition(error.to_string()))?;
.map_err(|error| Status::unavailable(error.to_string()))?;
let mut client = NodePluginClient::new(channel);

match client
.unfreeze_fs(Request::new(UnfreezeFsRequest {
volume_id: volume_id.clone(),
Expand All @@ -133,6 +159,28 @@ async fn issue_fs_unfreeze(endpoint: String, volume_id: String) -> Result<(), St
}
}

#[tracing::instrument(err, skip_all)]
async fn force_unstage(app_node: AppNode, volume_id: String) -> Result<(), Status> {
tracing::info!(
"Issuing cleanup for volume: {} to node {}, to endpoint {}",
volume_id,
app_node.id,
app_node.spec.endpoint
);
let channel = tonic_endpoint(format!("http://{}", app_node.spec.endpoint))?
.connect()
.await
.map_err(|error| Status::unavailable(error.to_string()))?;
let mut client = NodePluginClient::new(channel);

client
.force_unstage_volume(Request::new(ForceUnstageVolumeRequest {
volume_id: volume_id.clone(),
}))
.await
.map(|_| ())
}

/// Get share URI for existing volume object and the node where the volume is published.
fn get_volume_share_location(volume: &Volume) -> Option<(String, String)> {
volume
Expand Down Expand Up @@ -518,13 +566,12 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
error!("{}", m);
return Err(Status::internal(m));
}
},
}
_ => {

// Check for node being cordoned.
fn cordon_check(spec: Option<&NodeSpec>) -> bool {
if let Some(spec) = spec {
return spec.cordondrainstate.is_some()
return spec.cordondrainstate.is_some();
}
false
}
Expand All @@ -538,13 +585,19 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
// then let the control-plane decide where to place the target. Node should not be cordoned.
Ok(node) if node.state.as_ref().map(|n| n.status).unwrap_or(NodeStatus::Unknown) != NodeStatus::Online || cordon_check(node.spec.as_ref()) => {
Ok(None)
},
}
// For 1-replica volumes, don't pre-select the target node. This will allow the
// control-plane to pin the target to the replica node.
Ok(_) if volume.spec.num_replicas == 1 => Ok(None),
Ok(_) => Ok(Some(node_id.as_str())),
}?;

// Issue a cleanup rpc to csi node to ensure the subsystem doesn't have any path present before publishing
if self.force_unstage_volume {
let app_node = RestApiClient::get_client().get_app_node(&args.node_id).await?;
force_unstage(app_node, volume_id.to_string()).await?;
}

// Volume is not published.
let v = RestApiClient::get_client()
.publish_volume(&volume_id, target_node, protocol, args.node_id.clone(), &publish_context)
Expand Down
19 changes: 13 additions & 6 deletions control-plane/csi-driver/src/bin/controller/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,19 @@ async fn main() -> anyhow::Result<()> {
.help("Formatting style to be used while logging")
)
.arg(
Arg::new("ansi-colors")
.long("ansi-colors")
.default_value("true")
.value_parser(clap::value_parser!(bool))
.help("Enable ansi color for logs")
)
Arg::new("ansi-colors")
.long("ansi-colors")
.default_value("true")
.value_parser(clap::value_parser!(bool))
.help("Enable ansi color for logs")
)
.arg(
Arg::new("force-unstage-volume")
.long("force-unstage-volume")
.default_value("true")
.value_parser(clap::value_parser!(bool))
.help("Enable force unstage volume feature")
)
.get_matches();

utils::print_package_info!();
Expand Down
Loading
Loading