diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index fc84aa0e5..a2387aa38 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -20,14 +20,14 @@ repos: - id: rust-lint name: Rust lint description: Run cargo clippy on files included in the commit. clippy should be installed before-hand. - entry: nix-shell --pure --run 'cargo-clippy --all --all-targets -- -D warnings' + entry: nix-shell --pure --run './scripts/rust/linter.sh clippy' pass_filenames: false types: [file, rust] language: system - id: rust-style name: Rust style description: Run cargo fmt on files included in the commit. rustfmt should be installed before-hand. - entry: nix-shell --pure --run 'cargo-fmt --all -- --check' + entry: nix-shell --pure --run './scripts/rust/linter.sh fmt' exclude: openapi/ pass_filenames: false types: [file, rust] diff --git a/Jenkinsfile b/Jenkinsfile index 251d1a80e..0ec3d4512 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -115,8 +115,7 @@ pipeline { steps { sh 'printenv' sh 'nix-shell --run "./scripts/rust/generate-openapi-bindings.sh"' - sh 'nix-shell --run "cargo fmt --all -- --check"' - sh 'nix-shell --run "cargo clippy --all-targets -- -D warnings"' + sh 'nix-shell --run "./scripts/rust/linter.sh"' sh 'nix-shell --run "black --check tests/bdd"' sh 'nix-shell --run "./scripts/git/check-submodule-branches.sh"' } diff --git a/control-plane/csi-driver/src/bin/controller/server.rs b/control-plane/csi-driver/src/bin/controller/server.rs index acebfcd21..a16203200 100644 --- a/control-plane/csi-driver/src/bin/controller/server.rs +++ b/control-plane/csi-driver/src/bin/controller/server.rs @@ -3,8 +3,7 @@ use rpc::csi::{controller_server::ControllerServer, identity_server::IdentitySer use std::{fs, io::ErrorKind, ops::Add}; use tokio::net::UnixListener; -use tonic::codegen::tokio_stream::wrappers::UnixListenerStream; -use tonic::transport::Server; +use tonic::{codegen::tokio_stream::wrappers::UnixListenerStream, transport::Server}; use tracing::{debug, error, info}; pub(super) struct CsiServer {} diff --git a/control-plane/csi-driver/src/bin/node/client.rs b/control-plane/csi-driver/src/bin/node/client.rs index c95853d6a..77bddc74d 100644 --- a/control-plane/csi-driver/src/bin/node/client.rs +++ b/control-plane/csi-driver/src/bin/node/client.rs @@ -5,9 +5,15 @@ use stor_port::types::v0::openapi::{ models::{RegisterAppNode, RestJsonError}, }; -use anyhow::{anyhow, Result}; +use anyhow::anyhow; use std::{collections::HashMap, sync::Arc, time::Duration}; -use stor_port::types::v0::openapi::apis::app_nodes_api::tower::client::direct::AppNodes; +use stor_port::types::v0::openapi::{ + apis::{ + app_nodes_api::tower::client::direct::AppNodes, + volumes_api::tower::client::{direct::Volumes, VolumesClient}, + }, + models::NexusState, +}; use tonic::Status; use tracing::info; @@ -92,9 +98,6 @@ impl From> for ApiClientError { } } -/// Default rest api timeout for requests. -const DEFAULT_TIMEOUT_FOR_REST_REQUESTS: Duration = Duration::from_secs(5); - /// Wrapper for AppNodes REST API client. pub(crate) struct AppNodesClientWrapper { client: AppNodesClient, @@ -105,26 +108,24 @@ impl AppNodesClientWrapper { pub(crate) fn initialize( endpoint: Option<&String>, ) -> anyhow::Result> { + const REST_TIMEOUT: Duration = Duration::from_secs(5); + let Some(endpoint) = endpoint else { return Ok(None); }; let url = clients::tower::Url::parse(endpoint) - .map_err(|error| anyhow!("Invalid API endpoint URL {}: {:?}", endpoint, error))?; + .map_err(|error| anyhow!("Invalid API endpoint URL {endpoint}: {error:?}"))?; let tower = clients::tower::Configuration::builder() - .with_timeout(DEFAULT_TIMEOUT_FOR_REST_REQUESTS) + .with_timeout(REST_TIMEOUT) .build_url(url) .map_err(|error| { - anyhow::anyhow!( - "Failed to create openapi configuration, Error: '{:?}'", - error - ) + anyhow::anyhow!("Failed to create openapi configuration, Error: '{error:?}'") })?; info!( - "API client is initialized with endpoint {}, request timeout = {:?}", - endpoint, DEFAULT_TIMEOUT_FOR_REST_REQUESTS, + "API client is initialized with endpoint {endpoint}, request timeout = {REST_TIMEOUT:?}" ); Ok(Some(Self { @@ -159,3 +160,57 @@ impl AppNodesClientWrapper { Ok(()) } } + +/// Wrapper for Volumes REST API client. +#[derive(Clone)] +pub(crate) struct VolumesClientWrapper { + client: VolumesClient, +} + +impl VolumesClientWrapper { + /// Initialize VolumesClientWrapper instance. + pub(crate) fn new(endpoint: &str) -> anyhow::Result { + /// TODO: what's the NodeStage timeout? + const REST_TIMEOUT: Duration = Duration::from_secs(10); + + let url = clients::tower::Url::parse(endpoint) + .map_err(|error| anyhow!("Invalid API endpoint URL {endpoint}: {error:?}"))?; + + let config = clients::tower::Configuration::builder() + .with_timeout(REST_TIMEOUT) + .with_concurrency_limit(Some(10)) + .build_url(url) + .map_err(|error| { + anyhow::anyhow!("Failed to create openapi configuration, Error: '{error:?}'") + })?; + + info!( + "VolumesClient API is initialized with endpoint {endpoint}, request timeout = {REST_TIMEOUT:?}" + ); + + Ok(Self { + client: VolumesClient::new(Arc::new(config)), + }) + } + + /// Get the target URI for the given volume. + pub(crate) async fn volume_uri( + &self, + volume_id: &uuid::Uuid, + ) -> Result { + let volume: stor_port::types::v0::openapi::models::Volume = + self.client.get_volume(volume_id).await?; + let Some(target) = volume.state.target else { + return Err(ApiClientError::Unavailable( + "Volume target is not available".into(), + )); + }; + if !matches!(target.state, NexusState::Online | NexusState::Degraded) { + return Err(ApiClientError::Unavailable( + "Volume target is not ready for I/O".into(), + )); + } + // TODO: check for other volume statuses, example ONLINE? + Ok(target.device_uri) + } +} diff --git a/control-plane/csi-driver/src/bin/node/main_.rs b/control-plane/csi-driver/src/bin/node/main_.rs index bc531692e..4cc21e348 100644 --- a/control-plane/csi-driver/src/bin/node/main_.rs +++ b/control-plane/csi-driver/src/bin/node/main_.rs @@ -20,7 +20,7 @@ use grpc::csi_node_nvme::nvme_operations_server::NvmeOperationsServer; use stor_port::platform; use utils::tracing_telemetry::{FmtLayer, FmtStyle}; -use crate::client::AppNodesClientWrapper; +use crate::client::{AppNodesClientWrapper, VolumesClientWrapper}; use clap::Arg; use serde_json::json; use std::{ @@ -32,8 +32,7 @@ use std::{ str::FromStr, }; use tokio::net::UnixListener; -use tonic::codegen::tokio_stream::wrappers::UnixListenerStream; -use tonic::transport::Server; +use tonic::{codegen::tokio_stream::wrappers::UnixListenerStream, transport::Server}; use tracing::{debug, error, info}; const GRPC_PORT: u16 = 50051; @@ -58,6 +57,14 @@ pub(super) async fn main() -> anyhow::Result<()> { .requires("rest-endpoint") .help("Enable registration of the csi node with the control plane") ) + .arg( + Arg::new("enable-rest") + .long("enable-rest") + .action(clap::ArgAction::SetTrue) + .value_name("BOOLEAN") + .requires("rest-endpoint") + .help("Enhance csi with added rest functionality") + ) .arg( Arg::new("csi-socket") .short('c') @@ -322,10 +329,10 @@ pub(super) async fn main() -> anyhow::Result<()> { .unwrap_or("/var/tmp/csi.sock"); // Remove stale CSI socket from previous instance if there is any. match fs::remove_file(csi_socket) { - Ok(_) => info!("Removed stale CSI socket {}", csi_socket), + Ok(_) => info!("Removed stale CSI socket {csi_socket}"), Err(err) => { if err.kind() != ErrorKind::NotFound { - anyhow::bail!("Error removing stale CSI socket {}: {}", csi_socket, err); + anyhow::bail!("Error removing stale CSI socket {csi_socket}: {err}"); } } } @@ -371,22 +378,35 @@ impl CsiServer { let incoming = { let uds = UnixListener::bind(csi_socket).unwrap(); - info!("CSI plugin bound to {}", csi_socket); + info!("CSI plugin bound to {csi_socket}"); // Change permissions on CSI socket to allow non-privileged clients to access it // to simplify testing. - if let Err(e) = fs::set_permissions( + if let Err(error) = fs::set_permissions( csi_socket, std::os::unix::fs::PermissionsExt::from_mode(0o777), ) { - error!("Failed to change permissions for CSI socket: {:?}", e); + error!("Failed to change permissions for CSI socket: {error:?}"); } else { debug!("Successfully changed file permissions for CSI socket"); } UnixListenerStream::new(uds) }; - let node = Node::new(node_name.into(), node_selector, probe_filesystems()); + let vol_client = match cli_args.get_one::("rest-endpoint") { + Some(ep) if cli_args.get_flag("enable-rest") => Some(VolumesClientWrapper::new(ep)?), + _ => { + tracing::warn!("The rest client is not enabled - functionality may be limited"); + None + } + }; + + let node = Node::new( + node_name.into(), + node_selector, + probe_filesystems(), + vol_client, + ); Ok(async move { Server::builder() .add_service(NodeServer::new(node)) diff --git a/control-plane/csi-driver/src/bin/node/mount.rs b/control-plane/csi-driver/src/bin/node/mount.rs index e3a490d49..dfc526c43 100644 --- a/control-plane/csi-driver/src/bin/node/mount.rs +++ b/control-plane/csi-driver/src/bin/node/mount.rs @@ -1,6 +1,5 @@ //! Utility functions for mounting and unmounting filesystems. -use crate::filesystem_ops::FileSystem; -use crate::runtime; +use crate::{filesystem_ops::FileSystem, runtime}; use csi_driver::filesystem::FileSystem as Fs; use devinfo::mountinfo::{MountInfo, SafeMountIter}; diff --git a/control-plane/csi-driver/src/bin/node/node.rs b/control-plane/csi-driver/src/bin/node/node.rs index 3f333ed5e..70f54310f 100644 --- a/control-plane/csi-driver/src/bin/node/node.rs +++ b/control-plane/csi-driver/src/bin/node/node.rs @@ -1,5 +1,6 @@ use crate::{ block_vol::{publish_block_volume, unpublish_block_volume}, + client::VolumesClientWrapper, dev::{sysfs_dev_size, Device}, filesystem_ops::FileSystem, filesystem_vol::{publish_fs_volume, stage_fs_volume, unpublish_fs_volume, unstage_fs_volume}, @@ -36,11 +37,12 @@ macro_rules! failure { } /// The Csi Node implementation. -#[derive(Clone, Debug)] +#[derive(Clone)] pub(crate) struct Node { node_name: String, node_selector: HashMap, filesystems: Vec, + vol_client: Option, } impl Node { @@ -49,11 +51,13 @@ impl Node { node_name: String, node_selector: HashMap, filesystems: Vec, + vol_client: Option, ) -> Node { let self_ = Self { node_name, node_selector, filesystems, + vol_client, }; info!("Node topology segments: {:?}", self_.segments()); self_ @@ -73,6 +77,32 @@ impl Node { .chain(vec![self.node_name_segment()]) .collect() } + + /// Get the target URI for the given volume. + /// If the REST volume client is enabled, use it to fetch the volume and its target URI. + /// Otherwise, use the publish context. + async fn volume_uri( + &self, + volume_id: &Uuid, + context: &HashMap, + ) -> Result { + let ctx_uri = context.get("uri").ok_or_else(|| { + failure!( + Code::InvalidArgument, + "Failed to stage volume {volume_id}: URI attribute missing from publish context" + ) + })?; + + let Some(client) = &self.vol_client else { + return Ok(ctx_uri.to_string()); + }; + + let uri = client.volume_uri(volume_id).await?; + if &uri != ctx_uri { + tracing::warn!(volume.uri=%uri, ctx.uri=%ctx_uri, "Overriding URI volume context with URI from REST"); + } + Ok(uri) + } } const ATTACH_TIMEOUT_INTERVAL: Duration = Duration::from_millis(100); @@ -670,14 +700,6 @@ impl node_server::Node for Node { } }; - let uri = msg.publish_context.get("uri").ok_or_else(|| { - failure!( - Code::InvalidArgument, - "Failed to stage volume {}: URI attribute missing from publish context", - &msg.volume_id - ) - })?; - let uuid = Uuid::parse_str(&msg.volume_id).map_err(|error| { failure!( Code::Internal, @@ -688,13 +710,21 @@ impl node_server::Node for Node { })?; let _guard = VolumeOpGuard::new(uuid)?; + // We cannot fully rely on the URI stored in the `publish_context` because it may change + // if the target gets moved to another node. + // It's possible that a pod gets restaged on the same node, and thus skipping the controller + // publish call, leaving us with a bad URI: + // https://github.com/openebs/mayastor/issues/1781 + // In order to fix this issue we need to fetch the volume uri from the control-plane. + let uri = self.volume_uri(&uuid, &msg.publish_context).await?; + // Note checking existence of staging_target_path, is delegated to // code handling those volume types where it is relevant. // All checks complete, now attach, if not attached already. debug!("Volume {} has URI {}", &msg.volume_id, uri); - let mut device = Device::parse(uri).map_err(|error| { + let mut device = Device::parse(&uri).map_err(|error| { failure!( Code::Internal, "Failed to stage volume {}: error parsing URI {}: {}", diff --git a/deployer/src/infra/csi-driver/node.rs b/deployer/src/infra/csi-driver/node.rs index 78db36164..f683f5670 100644 --- a/deployer/src/infra/csi-driver/node.rs +++ b/deployer/src/infra/csi-driver/node.rs @@ -42,6 +42,7 @@ impl ComponentAction for CsiNode { i, cfg, options.enable_app_node_registration, + options.csi_node_rest, options.io_engine_cores, )?; } @@ -111,6 +112,7 @@ impl CsiNode { index: u32, cfg: Builder, enable_registration: bool, + enable_rest: bool, io_queues: u32, ) -> Result { let container_name = Self::container_name(index); @@ -123,6 +125,7 @@ impl CsiNode { &socket, cfg, enable_registration, + enable_rest, io_queues, ) } @@ -137,6 +140,7 @@ impl CsiNode { &socket, cfg, options.enable_app_node_registration, + options.csi_node_rest, options.io_engine_cores, ) } @@ -145,7 +149,8 @@ impl CsiNode { node_name: &str, socket: &str, cfg: Builder, - enable_registation: bool, + enable_registration: bool, + enable_rest: bool, io_queues: u32, ) -> Result { let io_queues = io_queues.max(1); @@ -156,7 +161,14 @@ impl CsiNode { // regardless of what its default value is. .with_args(vec!["--csi-socket", socket]); - binary = if enable_registation { + if enable_registration || enable_rest { + binary = binary.with_args(vec!["--rest-endpoint", "http://rest:8081"]); + } + if enable_rest { + binary = binary.with_args(vec!["--enable-rest"]); + } + + binary = if enable_registration { let endpoint = format!("{}:50055", cfg.next_ip_for_name(container_name)?); binary .with_args(vec!["--enable-registration"]) diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index c92a87589..460490f2c 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -123,6 +123,10 @@ pub struct StartOptions { #[clap(long)] pub csi_node: bool, + /// Enable the rest client in the csi-node plugin. + #[clap(long, requires = "csi_node")] + pub csi_node_rest: bool, + /// Use `N` csi-node instances #[clap(long, requires = "csi_node")] pub app_nodes: Option, diff --git a/scripts/rust/linter.sh b/scripts/rust/linter.sh index 8f5c8529e..37e429e6e 100755 --- a/scripts/rust/linter.sh +++ b/scripts/rust/linter.sh @@ -1,7 +1,31 @@ -#!/usr/bin/env bash +#!/usr/bin/env sh -cargo fmt -- --version -cargo fmt --all -- --config imports_granularity=Crate +set -e + +FMT_ERROR= + +OP="${1:-}" + +case "$OP" in + "" | "fmt" | "clippy") + ;; + *) + echo "linter $OP not supported" + exit 2 +esac +cargo fmt -- --version cargo clippy -- --version -cargo clippy --all --all-targets $1 -- -D warnings + +if [ -z "$OP" ] || [ "$OP" = "fmt" ]; then + cargo fmt --all --check || FMT_ERROR=$? + if [ -n "$FMT_ERROR" ]; then + cargo fmt --all + fi +fi + +if [ -z "$OP" ] || [ "$OP" = "clippy" ]; then + cargo clippy --all --all-targets -- -D warnings +fi + +exit ${FMT_ERROR:-0} diff --git a/tests/bdd/common/deployer.py b/tests/bdd/common/deployer.py index d129636b9..e6a597128 100644 --- a/tests/bdd/common/deployer.py +++ b/tests/bdd/common/deployer.py @@ -16,6 +16,7 @@ class StartOptions: wait: str = "10s" csi_controller: bool = False csi_node: bool = False + csi_rest: bool = False reconcile_period: str = "" faulted_child_wait_period: str = "" cache_period: str = "" @@ -51,6 +52,8 @@ def args(self): args.append("--csi-controller") if self.csi_node: args.append("--csi-node") + if self.csi_rest: + args.append("--csi-node-rest") if self.jaeger: args.append("--jaeger") if len(self.reconcile_period) > 0: @@ -124,6 +127,7 @@ def start( wait="10s", csi_controller=False, csi_node=False, + csi_rest=False, reconcile_period="", faulted_child_wait_period="", cache_period="", @@ -151,6 +155,7 @@ def start( wait, csi_controller=csi_controller, csi_node=csi_node, + csi_rest=csi_rest, reconcile_period=reconcile_period, faulted_child_wait_period=faulted_child_wait_period, cache_period=cache_period, diff --git a/tests/bdd/features/csi/node/node.feature b/tests/bdd/features/csi/node/node.feature index 7aef8c7b1..246f88dbb 100644 --- a/tests/bdd/features/csi/node/node.feature +++ b/tests/bdd/features/csi/node/node.feature @@ -29,6 +29,11 @@ Feature: CSI node plugin When staging a volume with a volume_capability with a mount with an unsupported fs_type Then the request should fail + Scenario: stage volume request with incorrect uri in the publish context + When staging a volume with an incorrect uri + But the rest client is enabled + Then the request should succeed + Scenario: staging a single writer volume When staging an "ext4" volume as "MULTI_NODE_SINGLE_WRITER" Then the request should succeed diff --git a/tests/bdd/features/csi/node/test_node.py b/tests/bdd/features/csi/node/test_node.py index b6f96108a..3c4a1cdbd 100644 --- a/tests/bdd/features/csi/node/test_node.py +++ b/tests/bdd/features/csi/node/test_node.py @@ -106,7 +106,7 @@ def get_volume_capability(volume, read_only): @pytest.fixture(scope="module") def setup(): - Deployer.start(1, csi_node=True) + Deployer.start(1, csi_node=True, csi_rest=True) # Create 2 pools. pool_labels = disk_pool_label @@ -237,6 +237,13 @@ def test_stage_volume_request_with_unsupported_fs_type(): """Stage volume request with unsupported fs_type.""" +@scenario( + "node.feature", "stage volume request with incorrect uri in the publish context" +) +def test_stage_volume_request_with_incorrect_uri_in_the_publish_context(): + """stage volume request with incorrect uri in the publish context.""" + + @scenario("node.feature", "stage volume request without specified access_mode") def test_stage_volume_request_without_specified_access_mode(): """Stage volume request without specified access_mode.""" @@ -624,6 +631,37 @@ def attempt_to_stage_volume_with_unsupported_fs_type( assert error.value.code() == grpc.StatusCode.INVALID_ARGUMENT +@when("staging a volume with an incorrect uri") +def _( + get_published_nexus, csi_instance, staging_target_path, io_timeout, staged_volumes +): + nexus = get_published_nexus + volume = Volume( + nexus.uuid, + nexus.protocol, + nexus.uri, + "MULTI_NODE_SINGLE_WRITER", + staging_target_path, + "ext4", + ) + csi_instance.node.NodeStageVolume( + pb.NodeStageVolumeRequest( + volume_id=nexus.uuid, + publish_context={"uri": "bad", "ioTimeout": io_timeout}, + staging_target_path=staging_target_path, + volume_capability=pb.VolumeCapability( + access_mode=pb.VolumeCapability.AccessMode( + mode=pb.VolumeCapability.AccessMode.Mode.MULTI_NODE_SINGLE_WRITER + ), + mount=pb.VolumeCapability.MountVolume(fs_type="ext4", mount_flags=[]), + ), + secrets={}, + volume_context={}, + ) + ) + staged_volumes[volume.uuid] = volume + + @when(parsers.parse('staging an "{fs_type}" volume as "{mode}"')) def stage_new_volume( get_published_nexus, stage_volume, staging_target_path, fs_type, mode @@ -775,6 +813,12 @@ def _(generic_staged_volume): wait_nvme_gone_device(uri) +@when("the rest client is enabled") +def _(): + """the rest client is enabled.""" + pass + + @then("the volume should be stageable again") def _(publish_nexus, generic_staged_volume, stage_volume): """the volume should be stageable again."""