Skip to content

Commit

Permalink
chore(bors): merge pull request #886
Browse files Browse the repository at this point in the history
886: feat(csi-driver): make csi-driver operations async r=Abhinandan-Purkait a=Abhinandan-Purkait

- Change the mount, unmounts and nvme operations over to use spawn blocking.
- Remove async_stream for grpc over uds for controller and node.

Co-authored-by: Abhinandan Purkait <[email protected]>
  • Loading branch information
mayastor-bors and Abhinandan-Purkait committed Dec 4, 2024
2 parents 1bdbfcb + 44ca573 commit 1a609f9
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 293 deletions.
79 changes: 7 additions & 72 deletions control-plane/csi-driver/src/bin/controller/server.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,12 @@
use crate::{controller::CsiControllerSvc, identity::CsiIdentitySvc};
use rpc::csi::{controller_server::ControllerServer, identity_server::IdentityServer};

use futures::TryFutureExt;
use std::{
fs,
io::ErrorKind,
ops::Add,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::UnixListener,
};
use tonic::transport::{server::Connected, Server};
use std::{fs, io::ErrorKind, ops::Add};
use tokio::net::UnixListener;
use tonic::codegen::tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::Server;
use tracing::{debug, error, info};

#[derive(Debug)]
struct UnixStream(tokio::net::UnixStream);

impl Connected for UnixStream {
type ConnectInfo = UdsConnectInfo;

fn connect_info(&self) -> Self::ConnectInfo {
UdsConnectInfo {
peer_addr: self.0.peer_addr().ok().map(Arc::new),
peer_cred: self.0.peer_cred().ok(),
}
}
}

// Not sure why we need the inner fields, probably worth checking if we can remove them.
#[derive(Clone, Debug)]
#[allow(unused)]
struct UdsConnectInfo {
peer_addr: Option<Arc<tokio::net::unix::SocketAddr>>,
peer_cred: Option<tokio::net::unix::UCred>,
}

impl AsyncRead for UnixStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}

impl AsyncWrite for UnixStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_flush(cx)
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_shutdown(cx)
}
}

pub(super) struct CsiServer {}
impl CsiServer {
/// Runs the CSI Server identity and controller services.
Expand All @@ -87,11 +27,11 @@ impl CsiServer {
}
}

debug!("CSI RPC server is listening on {}", csi_socket);

let incoming = {
let uds = UnixListener::bind(csi_socket)?;

info!("CSI RPC server is listening on {csi_socket}");

// Change permissions on CSI socket to allow non-privileged clients to access it
// to simplify testing.
if let Err(e) = fs::set_permissions(
Expand All @@ -103,12 +43,7 @@ impl CsiServer {
debug!("Successfully changed file permissions for CSI socket");
}

async_stream::stream! {
loop {
let item = uds.accept().map_ok(|(st, _)| UnixStream(st)).await;
yield item;
}
}
UnixListenerStream::new(uds)
};

let cfg = crate::CsiControllerConfig::get_config();
Expand Down
6 changes: 3 additions & 3 deletions control-plane/csi-driver/src/bin/node/block_vol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub(crate) async fn publish_block_volume(msg: &NodePublishVolumeRequest) -> Resu
}

if let Err(error) =
mount::blockdevice_mount(&device_path, target_path.as_str(), msg.readonly)
mount::blockdevice_mount(&device_path, target_path.as_str(), msg.readonly).await
{
return Err(failure!(
Code::Internal,
Expand All @@ -108,14 +108,14 @@ pub(crate) async fn publish_block_volume(msg: &NodePublishVolumeRequest) -> Resu
}
}

pub(crate) fn unpublish_block_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> {
pub(crate) async fn unpublish_block_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> {
let target_path = &msg.target_path;
let volume_id = &msg.volume_id;

// block volumes are mounted on block special file, which is not
// a regular file.
if mount::find_mount(None, Some(target_path)).is_some() {
match mount::blockdevice_unmount(target_path) {
match mount::blockdevice_unmount(target_path).await {
Ok(_) => {}
Err(err) => {
return Err(Status::new(
Expand Down
35 changes: 22 additions & 13 deletions control-plane/csi-driver/src/bin/node/dev/nvmf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
dev::util::extract_uuid,
match_dev::match_nvmf_device,
node::RDMA_CONNECT_CHECK,
runtime,
};

use super::{Attach, Detach, DeviceError, DeviceName};
Expand Down Expand Up @@ -206,12 +207,17 @@ impl Attach for NvmfAttach {
.hostnqn(self.hostnqn.clone())
.keep_alive_tmo(self.keep_alive_tmo)
.build()?;
match ca.connect() {
// Should we remove this arm?
Err(NvmeError::ConnectInProgress) => Ok(()),
Err(err) => Err(err.into()),
Ok(_) => Ok(()),
}

runtime::spawn_blocking(move || {
match ca.connect() {
// Should we remove this arm?
Err(NvmeError::ConnectInProgress) => Ok(()),
Err(err) => Err(err.into()),
Ok(_) => Ok(()),
}
})
.await
.map_err(|error| DeviceError::from(error.to_string()))?
}
Err(err) => Err(err.into()),
}
Expand Down Expand Up @@ -295,14 +301,17 @@ impl NvmfDetach {
#[tonic::async_trait]
impl Detach for NvmfDetach {
async fn detach(&self) -> Result<(), DeviceError> {
if disconnect(&self.nqn)? == 0 {
return Err(DeviceError::from(format!(
let nqn = self.nqn.clone();
runtime::spawn_blocking(move || match disconnect(&nqn) {
Ok(0) => Err(DeviceError::from(format!(
"nvmf disconnect {} failed: no device found",
self.nqn
)));
}

Ok(())
nqn
))),
Err(error) => Err(error.into()),
Ok(_) => Ok(()),
})
.await
.map_err(|error| DeviceError::from(error.to_string()))?
}

fn devname(&self) -> DeviceName {
Expand Down
16 changes: 8 additions & 8 deletions control-plane/csi-driver/src/bin/node/filesystem_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub(crate) trait FileSystemOps: Send + Sync {
/// Get the default mount options along with the user passed options for specific filesystems.
fn mount_flags(&self, mount_flags: Vec<String>) -> Vec<String>;
/// Unmount the filesystem if the filesystem uuid and the provided uuid differ.
fn unmount_on_fs_id_diff(
async fn unmount_on_fs_id_diff(
&self,
device_path: &str,
fs_staging_path: &str,
Expand Down Expand Up @@ -146,7 +146,7 @@ impl FileSystemOps for Ext4Fs {
mount_flags
}

fn unmount_on_fs_id_diff(
async fn unmount_on_fs_id_diff(
&self,
_device_path: &str,
_fs_staging_path: &str,
Expand Down Expand Up @@ -270,13 +270,13 @@ impl FileSystemOps for XFs {
mount_flags
}

fn unmount_on_fs_id_diff(
async fn unmount_on_fs_id_diff(
&self,
device_path: &str,
fs_staging_path: &str,
volume_uuid: &Uuid,
) -> Result<(), Error> {
mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid)
mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid).await
}

/// Xfs filesystem needs an unmount to clear the log, so that the parameters can be changed.
Expand All @@ -288,12 +288,12 @@ impl FileSystemOps for XFs {
options: &[String],
volume_uuid: &Uuid,
) -> Result<(), Error> {
mount::filesystem_mount(device, staging_path, &FileSystem(Fs::Xfs), options).map_err(|error| {
mount::filesystem_mount(device, staging_path, &FileSystem(Fs::Xfs), options).await.map_err(|error| {
format!(
"(xfs repairing) Failed to mount device {device} onto {staging_path} for {volume_uuid} : {error}",
)
})?;
mount::filesystem_unmount(staging_path).map_err(|error| {
mount::filesystem_unmount(staging_path).await.map_err(|error| {
format!(
"(xfs repairing) Failed to unmount device {device} from {staging_path} for {volume_uuid} : {error}",
)
Expand Down Expand Up @@ -358,13 +358,13 @@ impl FileSystemOps for BtrFs {
mount_flags
}

fn unmount_on_fs_id_diff(
async fn unmount_on_fs_id_diff(
&self,
device_path: &str,
fs_staging_path: &str,
volume_uuid: &Uuid,
) -> Result<(), Error> {
mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid)
mount::unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid).await
}

/// `btrfs check --readonly` is a not a `DANGEROUS OPTION` as it only exists to calm potential
Expand Down
26 changes: 14 additions & 12 deletions control-plane/csi-driver/src/bin/node/filesystem_vol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub(crate) async fn stage_fs_volume(
// If clone's fs id change was requested and we were not able to change it in first attempt
// unmount and continue the stage again.
let continue_stage = if fs_id.is_some() {
continue_after_unmount_on_fs_id_diff(fstype ,device_path, fs_staging_path, &volume_uuid)
continue_after_unmount_on_fs_id_diff(fstype ,device_path, fs_staging_path, &volume_uuid).await
.map_err(|error| {
failure!(
Code::FailedPrecondition,
Expand All @@ -107,7 +107,7 @@ pub(crate) async fn stage_fs_volume(
if !continue_stage {
// todo: validate other flags?
if mnt.mount_flags.readonly() != existing.options.readonly() {
mount::remount(fs_staging_path, mnt.mount_flags.readonly())?;
mount::remount(fs_staging_path, mnt.mount_flags.readonly()).await?;
}

return Ok(());
Expand Down Expand Up @@ -161,7 +161,8 @@ pub(crate) async fn stage_fs_volume(

debug!("Mounting device {} onto {}", device_path, fs_staging_path);

if let Err(error) = mount::filesystem_mount(device_path, fs_staging_path, fstype, &mount_flags)
if let Err(error) =
mount::filesystem_mount(device_path, fs_staging_path, fstype, &mount_flags).await
{
return Err(failure!(
Code::Internal,
Expand Down Expand Up @@ -209,7 +210,7 @@ pub(crate) async fn unstage_fs_volume(msg: &NodeUnstageVolumeRequest) -> Result<
));
}

if let Err(error) = mount::filesystem_unmount(fs_staging_path) {
if let Err(error) = mount::filesystem_unmount(fs_staging_path).await {
return Err(failure!(
Code::Internal,
"Failed to unstage volume {}: failed to unmount device {:?} from {}: {}",
Expand All @@ -227,7 +228,7 @@ pub(crate) async fn unstage_fs_volume(msg: &NodeUnstageVolumeRequest) -> Result<
}

/// Publish a filesystem volume
pub(crate) fn publish_fs_volume(
pub(crate) async fn publish_fs_volume(
msg: &NodePublishVolumeRequest,
mnt: &MountVolume,
filesystems: &[FileSystem],
Expand Down Expand Up @@ -328,7 +329,7 @@ pub(crate) fn publish_fs_volume(

debug!("Mounting {} to {}", fs_staging_path, target_path);

if let Err(error) = mount::bind_mount(fs_staging_path, target_path, false) {
if let Err(error) = mount::bind_mount(fs_staging_path, target_path, false).await {
return Err(failure!(
Code::Internal,
"Failed to publish volume {}: failed to mount {} to {}: {}",
Expand All @@ -345,7 +346,7 @@ pub(crate) fn publish_fs_volume(

debug!("Remounting {} as readonly", target_path);

if let Err(error) = mount::bind_remount(target_path, &options) {
if let Err(error) = mount::bind_remount(target_path, &options).await {
let message = format!(
"Failed to publish volume {volume_id}: failed to mount {fs_staging_path} to {target_path} as readonly: {error}"
);
Expand All @@ -354,7 +355,7 @@ pub(crate) fn publish_fs_volume(

debug!("Unmounting {}", target_path);

if let Err(error) = mount::bind_unmount(target_path) {
if let Err(error) = mount::bind_unmount(target_path).await {
error!("Failed to unmount {}: {}", target_path, error);
}

Expand All @@ -367,7 +368,7 @@ pub(crate) fn publish_fs_volume(
Ok(())
}

pub(crate) fn unpublish_fs_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> {
pub(crate) async fn unpublish_fs_volume(msg: &NodeUnpublishVolumeRequest) -> Result<(), Status> {
// filesystem mount
let target_path = &msg.target_path;
let volume_id = &msg.volume_id;
Expand Down Expand Up @@ -398,7 +399,7 @@ pub(crate) fn unpublish_fs_volume(msg: &NodeUnpublishVolumeRequest) -> Result<()

debug!("Unmounting {}", target_path);

if let Err(error) = mount::bind_unmount(target_path) {
if let Err(error) = mount::bind_unmount(target_path).await {
return Err(failure!(
Code::Internal,
"Failed to unpublish volume {}: failed to unmount {}: {}",
Expand Down Expand Up @@ -427,14 +428,15 @@ pub(crate) fn unpublish_fs_volume(msg: &NodeUnpublishVolumeRequest) -> Result<()

/// Check if we can continue the staging incase the change fs id failed mid way and we want to retry
/// the flow.
fn continue_after_unmount_on_fs_id_diff(
async fn continue_after_unmount_on_fs_id_diff(
fstype: &FileSystem,
device_path: &str,
fs_staging_path: &str,
volume_uuid: &Uuid,
) -> Result<bool, String> {
fstype
.fs_ops()?
.unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid)?;
.unmount_on_fs_id_diff(device_path, fs_staging_path, volume_uuid)
.await?;
Ok(fstype == &Fs::Xfs.into())
}
1 change: 1 addition & 0 deletions control-plane/csi-driver/src/bin/node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod nodeplugin_nvme;
#[cfg(target_os = "linux")]
mod nodeplugin_svc;
mod registration;
mod runtime;
/// Shutdown event which lets the plugin know it needs to stop processing new events and
/// complete any existing ones before shutting down.
#[cfg(target_os = "linux")]
Expand Down
Loading

0 comments on commit 1a609f9

Please sign in to comment.