Skip to content

Commit

Permalink
chore(bors): merge pull request #905
Browse files Browse the repository at this point in the history
905: feat(csi-driver): add pre-publish hook to cleanup stale entries r=Abhinandan-Purkait a=Abhinandan-Purkait

Scenario:
If an node undergoes a network partition and the application pods on it are moved by out-of-service taint then it can happen that the stale mount paths and the nvme controllers get left around, because CSI NodeUnstage was never called.
Once the node comes back and joins the cluster and the application pod comes back to the same node, it can happen that the stale nvme path gets live again causing the existing buffer data to be flushed potentially causing data corruption.

Changes:
This adds a pre-publish hook that can intercept the ControllerPublish and issue cleanup on the node if stale entries are detected.

Co-authored-by: Abhinandan Purkait <[email protected]>
  • Loading branch information
mayastor-bors and Abhinandan-Purkait committed Dec 12, 2024
2 parents e5e8342 + 827986c commit b3017b7
Show file tree
Hide file tree
Showing 8 changed files with 346 additions and 29 deletions.
10 changes: 10 additions & 0 deletions control-plane/csi-driver/proto/node-service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ service NodePlugin {
rpc UnfreezeFS (UnfreezeFSRequest) returns (UnfreezeFSReply) {}
// Find the volume identified by the volume ID, and return volume information.
rpc FindVolume (FindVolumeRequest) returns (FindVolumeReply) {}
// Force unstage the volume.
rpc ForceUnstageVolume (ForceUnstageVolumeRequest) returns (ForceUnstageVolumeReply) {}
}

enum VolumeType {
Expand Down Expand Up @@ -54,3 +56,11 @@ message FindVolumeReply {
optional VolumeType volume_type = 1;
string device_path = 2; // the device path for the volume
}

// The request message containing ID of the volume to be force unstaged.
message ForceUnstageVolumeRequest {
string volume_id = 1;
}
// Message for response on volume force unstage.
message ForceUnstageVolumeReply {
}
82 changes: 68 additions & 14 deletions control-plane/csi-driver/src/bin/controller/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use crate::{
};
use csi_driver::{
context::{CreateParams, CreateSnapshotParams, PublishParams, QuiesceFsCandidate},
node::internal::{node_plugin_client::NodePluginClient, FreezeFsRequest, UnfreezeFsRequest},
node::internal::{
node_plugin_client::NodePluginClient, ForceUnstageVolumeRequest, FreezeFsRequest,
UnfreezeFsRequest,
},
};
use rpc::csi::{volume_content_source::Type, Topology as CsiTopology, *};
use stor_port::types::v0::openapi::{
Expand All @@ -14,11 +17,12 @@ use stor_port::types::v0::openapi::{
SpecStatus, Volume, VolumeShareProtocol,
},
};
use utils::{dsp_created_by_key, DSP_OPERATOR};
use utils::{dsp_created_by_key, DEFAULT_REQ_TIMEOUT, DSP_OPERATOR};

use regex::Regex;
use std::{collections::HashMap, str::FromStr};
use tonic::{Code, Request, Response, Status};
use std::{collections::HashMap, str::FromStr, time::Duration};
use stor_port::types::v0::openapi::models::AppNode;
use tonic::{transport::Uri, Code, Request, Response, Status};
use tracing::{debug, error, instrument, trace, warn};
use uuid::Uuid;
use volume_capability::AccessType;
Expand Down Expand Up @@ -91,12 +95,30 @@ fn volume_app_node(volume: &Volume) -> Option<String> {
}
}

#[tracing::instrument]
/// Create a new endpoint that connects to the provided Uri.
/// This endpoint has default connect and request timeouts.
fn tonic_endpoint(endpoint: String) -> Result<tonic::transport::Endpoint, Status> {
let uri =
Uri::try_from(endpoint).map_err(|error| Status::invalid_argument(error.to_string()))?;

let timeout = humantime::parse_duration(DEFAULT_REQ_TIMEOUT).unwrap();
Ok(tonic::transport::Endpoint::from(uri)
.connect_timeout(timeout)
.timeout(std::time::Duration::from_secs(30))
.http2_keep_alive_interval(Duration::from_secs(5))
.keep_alive_timeout(Duration::from_secs(10))
.concurrency_limit(utils::DEFAULT_GRPC_CLIENT_CONCURRENCY))
}

#[tracing::instrument(err, skip_all)]
async fn issue_fs_freeze(endpoint: String, volume_id: String) -> Result<(), Status> {
trace!("Issuing fs freeze");
let mut client = NodePluginClient::connect(format!("http://{endpoint}"))
let channel = tonic_endpoint(format!("http://{endpoint}"))?
.connect()
.await
.map_err(|error| Status::failed_precondition(error.to_string()))?;
.map_err(|error| Status::unavailable(error.to_string()))?;
let mut client = NodePluginClient::new(channel);

match client
.freeze_fs(Request::new(FreezeFsRequest {
volume_id: volume_id.clone(),
Expand All @@ -112,12 +134,15 @@ async fn issue_fs_freeze(endpoint: String, volume_id: String) -> Result<(), Stat
}
}

#[tracing::instrument]
#[tracing::instrument(err, skip_all)]
async fn issue_fs_unfreeze(endpoint: String, volume_id: String) -> Result<(), Status> {
trace!("Issuing fs unfreeze");
let mut client = NodePluginClient::connect(format!("http://{endpoint}"))
let channel = tonic_endpoint(format!("http://{endpoint}"))?
.connect()
.await
.map_err(|error| Status::failed_precondition(error.to_string()))?;
.map_err(|error| Status::unavailable(error.to_string()))?;
let mut client = NodePluginClient::new(channel);

match client
.unfreeze_fs(Request::new(UnfreezeFsRequest {
volume_id: volume_id.clone(),
Expand All @@ -133,6 +158,28 @@ async fn issue_fs_unfreeze(endpoint: String, volume_id: String) -> Result<(), St
}
}

#[tracing::instrument(err, skip_all)]
async fn force_unstage(app_node: AppNode, volume_id: String) -> Result<(), Status> {
tracing::info!(
"Issuing cleanup for volume: {} to node {}, to endpoint {}",
volume_id,
app_node.id,
app_node.spec.endpoint
);
let channel = tonic_endpoint(format!("http://{}", app_node.spec.endpoint))?
.connect()
.await
.map_err(|error| Status::unavailable(error.to_string()))?;
let mut client = NodePluginClient::new(channel);

client
.force_unstage_volume(Request::new(ForceUnstageVolumeRequest {
volume_id: volume_id.clone(),
}))
.await
.map(|_| ())
}

/// Get share URI for existing volume object and the node where the volume is published.
fn get_volume_share_location(volume: &Volume) -> Option<(String, String)> {
volume
Expand Down Expand Up @@ -518,13 +565,12 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
error!("{}", m);
return Err(Status::internal(m));
}
},
}
_ => {

// Check for node being cordoned.
fn cordon_check(spec: Option<&NodeSpec>) -> bool {
if let Some(spec) = spec {
return spec.cordondrainstate.is_some()
return spec.cordondrainstate.is_some();
}
false
}
Expand All @@ -538,13 +584,21 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
// then let the control-plane decide where to place the target. Node should not be cordoned.
Ok(node) if node.state.as_ref().map(|n| n.status).unwrap_or(NodeStatus::Unknown) != NodeStatus::Online || cordon_check(node.spec.as_ref()) => {
Ok(None)
},
}
// For 1-replica volumes, don't pre-select the target node. This will allow the
// control-plane to pin the target to the replica node.
Ok(_) if volume.spec.num_replicas == 1 => Ok(None),
Ok(_) => Ok(Some(node_id.as_str())),
}?;

// Issue a cleanup rpc to csi node to ensure the subsystem doesn't have any path present before publishing
match RestApiClient::get_client().get_app_node(&args.node_id).await {
Ok(app_node) => force_unstage(app_node, volume_id.to_string()).await?,
Err(ApiClientError::ResourceNotExists(..)) => warn!("App node: {}, not found, skipping force unstage volume", args.node_id),
Err(error) => return Err(error.into())
}


// Volume is not published.
let v = RestApiClient::get_client()
.publish_volume(&volume_id, target_node, protocol, args.node_id.clone(), &publish_context)
Expand Down
12 changes: 6 additions & 6 deletions control-plane/csi-driver/src/bin/controller/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ async fn main() -> anyhow::Result<()> {
.help("Formatting style to be used while logging")
)
.arg(
Arg::new("ansi-colors")
.long("ansi-colors")
.default_value("true")
.value_parser(clap::value_parser!(bool))
.help("Enable ansi color for logs")
)
Arg::new("ansi-colors")
.long("ansi-colors")
.default_value("true")
.value_parser(clap::value_parser!(bool))
.help("Enable ansi color for logs")
)
.get_matches();

utils::print_package_info!();
Expand Down
145 changes: 144 additions & 1 deletion control-plane/csi-driver/src/bin/node/findmnt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use csi_driver::filesystem::FileSystem as Fs;
use serde_json::Value;
use std::{collections::HashMap, process::Command, str::FromStr, string::String, vec::Vec};
use tracing::{error, warn};
use utils::csi_plugin_name;

// Keys of interest we expect to find in the JSON output generated
// by findmnt.
Expand All @@ -13,11 +14,20 @@ const FSTYPE_KEY: &str = "fstype";

#[derive(Debug)]
pub(crate) struct DeviceMount {
#[allow(dead_code)]
mount_path: String,
fstype: FileSystem,
}

impl std::fmt::Display for DeviceMount {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"MountPath: {}, FileSystem: {}",
self.mount_path, self.fstype
)
}
}

impl DeviceMount {
/// create new DeviceMount
pub(crate) fn new(mount_path: String, fstype: FileSystem) -> DeviceMount {
Expand All @@ -27,6 +37,10 @@ impl DeviceMount {
pub(crate) fn fstype(&self) -> FileSystem {
self.fstype.clone()
}
/// Get the mount path.
pub(crate) fn mount_path(&self) -> String {
self.mount_path.clone()
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -213,3 +227,132 @@ pub(crate) fn get_mountpaths(device_path: &str) -> Result<Vec<DeviceMount>, Devi
Err(e) => Err(e),
}
}

const DRIVER_NAME: &str = "driverName";
const VOLUME_HANDLE: &str = "volumeHandle";
const METADATA_FILE: &str = "vol_data.json";
const DEVICE_PATTERN: &str = "nvme";

/// This filter is to be used specifically search for mountpoints in context of k8s
///
/// # Fields
/// * `driver` - Name of the csi driver which provisioned the volume.
/// * `volume_id` - Name of the volume which is being searched for.
/// * `file_name` - Name of the file where kubelet stores the metadata.
/// * `device_pattern` - Pattern to search for a specific type of device, ex nvme.
#[derive(Debug)]
struct FilterCsiMounts<'a> {
driver: &'a str,
volume_id: &'a str,
file_name: &'a str,
device_pattern: &'a str,
}

/// Finds CSI mount points using `findmnt` and filters based on the given criteria.
fn find_csi_mount(filter: FilterCsiMounts) -> Result<Vec<HashMap<String, String>>, DeviceError> {
let output = Command::new(FIND_MNT).args(FIND_MNT_ARGS).output()?;

if !output.status.success() {
return Err(DeviceError::new(String::from_utf8(output.stderr)?.as_str()));
}

let json_str = String::from_utf8(output.stdout)?;
let json: Value = serde_json::from_str(&json_str)?;

let mut results: Vec<HashMap<String, String>> = Vec::new();
filter_csi_findmnt(&json, &filter, &mut results);

Ok(results)
}

/// Filters JSON output from `findmnt` for matching CSI mount points as per the filter.
fn filter_csi_findmnt(
json_val: &Value,
filter: &FilterCsiMounts,
results: &mut Vec<HashMap<String, String>>,
) {
match json_val {
Value::Array(json_array) => {
for jsonvalue in json_array {
filter_csi_findmnt(jsonvalue, filter, results);
}
}
Value::Object(json_map) => {
if let Some(source_value) = json_map.get(SOURCE_KEY) {
let source_str = key_adjusted_value(SOURCE_KEY, source_value);
if source_str.contains(filter.device_pattern) {
if let Some(target_value) = json_map.get(TARGET_KEY) {
let target_str = target_value.as_str().unwrap_or_default();

if let Some(parent_path) = std::path::Path::new(target_str).parent() {
let vol_data_path = parent_path.join(filter.file_name);
let vol_data_path_str = vol_data_path.to_string_lossy();

if let Ok(vol_data) = read_vol_data_json(&vol_data_path_str) {
if vol_data.get(DRIVER_NAME)
== Some(&Value::String(filter.driver.to_string()))
&& vol_data.get(VOLUME_HANDLE)
== Some(&Value::String(filter.volume_id.to_string()))
{
results.push(jsonmap_to_hashmap(json_map));
}
}
}
}
}
}

for (_, jsonvalue) in json_map {
if jsonvalue.is_array() || jsonvalue.is_object() {
filter_csi_findmnt(jsonvalue, filter, results);
}
}
}
_ => (),
};
}

/// Reads and parses a file into a JSON object.
fn read_vol_data_json(path: &str) -> Result<serde_json::Map<String, Value>, DeviceError> {
let file = std::fs::File::open(path)?;
let reader = std::io::BufReader::new(file);
let json: serde_json::Map<String, Value> = serde_json::from_reader(reader)?;
Ok(json)
}

/// Retrieves mount paths for a given CSI volume ID by parsing the metadata file.
pub(crate) async fn get_csi_mountpaths(volume_id: &str) -> Result<Vec<DeviceMount>, DeviceError> {
let filter = FilterCsiMounts {
driver: &csi_plugin_name(),
volume_id,
file_name: METADATA_FILE,
device_pattern: DEVICE_PATTERN,
};
match find_csi_mount(filter) {
Ok(results) => {
let mut mountpaths: Vec<DeviceMount> = Vec::new();
for entry in results {
if let Some(mountpath) = entry.get(TARGET_KEY) {
if let Some(fstype) = entry.get(FSTYPE_KEY) {
mountpaths.push(DeviceMount::new(
mountpath.to_string(),
Fs::from_str(fstype)
.unwrap_or(Fs::Unsupported(fstype.to_string()))
.into(),
))
} else {
warn!(volume.id=%volume_id, "Missing fstype for {mountpath}");
mountpaths.push(DeviceMount::new(
mountpath.to_string(),
Fs::Unsupported("".to_string()).into(),
))
}
} else {
warn!(volume.id=%volume_id, ?entry, "Missing target field");
}
}
Ok(mountpaths)
}
Err(e) => Err(e),
}
}
Loading

0 comments on commit b3017b7

Please sign in to comment.