Skip to content

Commit

Permalink
chore(bors): merge pull request #903
Browse files Browse the repository at this point in the history
903: fix(csi/stage): fetch uri via REST r=tiagolobocastro a=tiagolobocastro

On a NodeStage call, it's possible that the publish_context URI is out of date. This can happen when the volume has been moved to another node, and the app pod is pinned to a node and the node restarts.
See Mayastor Issue 1781 for more details.

For this fix, we add a new option to enable rest client for the csi-node. Ideally we'd like to strictly adhere to CSI spec, and avoid doing any mayastor specific operations (effectively being a mostly generic nvme-connect csi-driver but the immutability of the publish_context makes this a bit difficult.

Anyways, we add a new flag --enable-rest which is optional, thus still allowing us to run without this layer.
This will be enabled by default on the helm chart.

We also further check for the Nexus Online/Degraded state, which should help avoid a bunch of nvme connect errors in the kernel.

With this, we can also improve a few things down the line, such as ensuring resize before publish, etc... but we should take these 1 at a time and not suddendly do everything via rest...

Co-authored-by: Tiago Castro <[email protected]>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Dec 11, 2024
2 parents b49a4a4 + c9ed89c commit 15c9918
Show file tree
Hide file tree
Showing 13 changed files with 243 additions and 47 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ repos:
- id: rust-lint
name: Rust lint
description: Run cargo clippy on files included in the commit. clippy should be installed before-hand.
entry: nix-shell --pure --run 'cargo-clippy --all --all-targets -- -D warnings'
entry: nix-shell --pure --run './scripts/rust/linter.sh clippy'
pass_filenames: false
types: [file, rust]
language: system
- id: rust-style
name: Rust style
description: Run cargo fmt on files included in the commit. rustfmt should be installed before-hand.
entry: nix-shell --pure --run 'cargo-fmt --all -- --check'
entry: nix-shell --pure --run './scripts/rust/linter.sh fmt'
exclude: openapi/
pass_filenames: false
types: [file, rust]
Expand Down
3 changes: 1 addition & 2 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ pipeline {
steps {
sh 'printenv'
sh 'nix-shell --run "./scripts/rust/generate-openapi-bindings.sh"'
sh 'nix-shell --run "cargo fmt --all -- --check"'
sh 'nix-shell --run "cargo clippy --all-targets -- -D warnings"'
sh 'nix-shell --run "./scripts/rust/linter.sh"'
sh 'nix-shell --run "black --check tests/bdd"'
sh 'nix-shell --run "./scripts/git/check-submodule-branches.sh"'
}
Expand Down
3 changes: 1 addition & 2 deletions control-plane/csi-driver/src/bin/controller/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use rpc::csi::{controller_server::ControllerServer, identity_server::IdentitySer

use std::{fs, io::ErrorKind, ops::Add};
use tokio::net::UnixListener;
use tonic::codegen::tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::Server;
use tonic::{codegen::tokio_stream::wrappers::UnixListenerStream, transport::Server};
use tracing::{debug, error, info};

pub(super) struct CsiServer {}
Expand Down
81 changes: 68 additions & 13 deletions control-plane/csi-driver/src/bin/node/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ use stor_port::types::v0::openapi::{
models::{RegisterAppNode, RestJsonError},
};

use anyhow::{anyhow, Result};
use anyhow::anyhow;
use std::{collections::HashMap, sync::Arc, time::Duration};
use stor_port::types::v0::openapi::apis::app_nodes_api::tower::client::direct::AppNodes;
use stor_port::types::v0::openapi::{
apis::{
app_nodes_api::tower::client::direct::AppNodes,
volumes_api::tower::client::{direct::Volumes, VolumesClient},
},
models::NexusState,
};
use tonic::Status;
use tracing::info;

Expand Down Expand Up @@ -92,9 +98,6 @@ impl From<clients::tower::Error<RestJsonError>> for ApiClientError {
}
}

/// Default rest api timeout for requests.
const DEFAULT_TIMEOUT_FOR_REST_REQUESTS: Duration = Duration::from_secs(5);

/// Wrapper for AppNodes REST API client.
pub(crate) struct AppNodesClientWrapper {
client: AppNodesClient,
Expand All @@ -105,26 +108,24 @@ impl AppNodesClientWrapper {
pub(crate) fn initialize(
endpoint: Option<&String>,
) -> anyhow::Result<Option<AppNodesClientWrapper>> {
const REST_TIMEOUT: Duration = Duration::from_secs(5);

let Some(endpoint) = endpoint else {
return Ok(None);
};

let url = clients::tower::Url::parse(endpoint)
.map_err(|error| anyhow!("Invalid API endpoint URL {}: {:?}", endpoint, error))?;
.map_err(|error| anyhow!("Invalid API endpoint URL {endpoint}: {error:?}"))?;

let tower = clients::tower::Configuration::builder()
.with_timeout(DEFAULT_TIMEOUT_FOR_REST_REQUESTS)
.with_timeout(REST_TIMEOUT)
.build_url(url)
.map_err(|error| {
anyhow::anyhow!(
"Failed to create openapi configuration, Error: '{:?}'",
error
)
anyhow::anyhow!("Failed to create openapi configuration, Error: '{error:?}'")
})?;

info!(
"API client is initialized with endpoint {}, request timeout = {:?}",
endpoint, DEFAULT_TIMEOUT_FOR_REST_REQUESTS,
"API client is initialized with endpoint {endpoint}, request timeout = {REST_TIMEOUT:?}"
);

Ok(Some(Self {
Expand Down Expand Up @@ -159,3 +160,57 @@ impl AppNodesClientWrapper {
Ok(())
}
}

/// Wrapper for Volumes REST API client.
#[derive(Clone)]
pub(crate) struct VolumesClientWrapper {
client: VolumesClient,
}

impl VolumesClientWrapper {
/// Initialize VolumesClientWrapper instance.
pub(crate) fn new(endpoint: &str) -> anyhow::Result<Self> {
/// TODO: what's the NodeStage timeout?
const REST_TIMEOUT: Duration = Duration::from_secs(10);

let url = clients::tower::Url::parse(endpoint)
.map_err(|error| anyhow!("Invalid API endpoint URL {endpoint}: {error:?}"))?;

let config = clients::tower::Configuration::builder()
.with_timeout(REST_TIMEOUT)
.with_concurrency_limit(Some(10))
.build_url(url)
.map_err(|error| {
anyhow::anyhow!("Failed to create openapi configuration, Error: '{error:?}'")
})?;

info!(
"VolumesClient API is initialized with endpoint {endpoint}, request timeout = {REST_TIMEOUT:?}"
);

Ok(Self {
client: VolumesClient::new(Arc::new(config)),
})
}

/// Get the target URI for the given volume.
pub(crate) async fn volume_uri(
&self,
volume_id: &uuid::Uuid,
) -> Result<String, ApiClientError> {
let volume: stor_port::types::v0::openapi::models::Volume =
self.client.get_volume(volume_id).await?;
let Some(target) = volume.state.target else {
return Err(ApiClientError::Unavailable(
"Volume target is not available".into(),
));
};
if !matches!(target.state, NexusState::Online | NexusState::Degraded) {
return Err(ApiClientError::Unavailable(
"Volume target is not ready for I/O".into(),
));
}
// TODO: check for other volume statuses, example ONLINE?
Ok(target.device_uri)
}
}
38 changes: 29 additions & 9 deletions control-plane/csi-driver/src/bin/node/main_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use grpc::csi_node_nvme::nvme_operations_server::NvmeOperationsServer;
use stor_port::platform;
use utils::tracing_telemetry::{FmtLayer, FmtStyle};

use crate::client::AppNodesClientWrapper;
use crate::client::{AppNodesClientWrapper, VolumesClientWrapper};
use clap::Arg;
use serde_json::json;
use std::{
Expand All @@ -32,8 +32,7 @@ use std::{
str::FromStr,
};
use tokio::net::UnixListener;
use tonic::codegen::tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::Server;
use tonic::{codegen::tokio_stream::wrappers::UnixListenerStream, transport::Server};
use tracing::{debug, error, info};

const GRPC_PORT: u16 = 50051;
Expand All @@ -58,6 +57,14 @@ pub(super) async fn main() -> anyhow::Result<()> {
.requires("rest-endpoint")
.help("Enable registration of the csi node with the control plane")
)
.arg(
Arg::new("enable-rest")
.long("enable-rest")
.action(clap::ArgAction::SetTrue)
.value_name("BOOLEAN")
.requires("rest-endpoint")
.help("Enhance csi with added rest functionality")
)
.arg(
Arg::new("csi-socket")
.short('c')
Expand Down Expand Up @@ -322,10 +329,10 @@ pub(super) async fn main() -> anyhow::Result<()> {
.unwrap_or("/var/tmp/csi.sock");
// Remove stale CSI socket from previous instance if there is any.
match fs::remove_file(csi_socket) {
Ok(_) => info!("Removed stale CSI socket {}", csi_socket),
Ok(_) => info!("Removed stale CSI socket {csi_socket}"),
Err(err) => {
if err.kind() != ErrorKind::NotFound {
anyhow::bail!("Error removing stale CSI socket {}: {}", csi_socket, err);
anyhow::bail!("Error removing stale CSI socket {csi_socket}: {err}");
}
}
}
Expand Down Expand Up @@ -371,22 +378,35 @@ impl CsiServer {

let incoming = {
let uds = UnixListener::bind(csi_socket).unwrap();
info!("CSI plugin bound to {}", csi_socket);
info!("CSI plugin bound to {csi_socket}");

// Change permissions on CSI socket to allow non-privileged clients to access it
// to simplify testing.
if let Err(e) = fs::set_permissions(
if let Err(error) = fs::set_permissions(
csi_socket,
std::os::unix::fs::PermissionsExt::from_mode(0o777),
) {
error!("Failed to change permissions for CSI socket: {:?}", e);
error!("Failed to change permissions for CSI socket: {error:?}");
} else {
debug!("Successfully changed file permissions for CSI socket");
}
UnixListenerStream::new(uds)
};

let node = Node::new(node_name.into(), node_selector, probe_filesystems());
let vol_client = match cli_args.get_one::<String>("rest-endpoint") {
Some(ep) if cli_args.get_flag("enable-rest") => Some(VolumesClientWrapper::new(ep)?),
_ => {
tracing::warn!("The rest client is not enabled - functionality may be limited");
None
}
};

let node = Node::new(
node_name.into(),
node_selector,
probe_filesystems(),
vol_client,
);
Ok(async move {
Server::builder()
.add_service(NodeServer::new(node))
Expand Down
3 changes: 1 addition & 2 deletions control-plane/csi-driver/src/bin/node/mount.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Utility functions for mounting and unmounting filesystems.
use crate::filesystem_ops::FileSystem;
use crate::runtime;
use crate::{filesystem_ops::FileSystem, runtime};
use csi_driver::filesystem::FileSystem as Fs;
use devinfo::mountinfo::{MountInfo, SafeMountIter};

Expand Down
50 changes: 40 additions & 10 deletions control-plane/csi-driver/src/bin/node/node.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
block_vol::{publish_block_volume, unpublish_block_volume},
client::VolumesClientWrapper,
dev::{sysfs_dev_size, Device},
filesystem_ops::FileSystem,
filesystem_vol::{publish_fs_volume, stage_fs_volume, unpublish_fs_volume, unstage_fs_volume},
Expand Down Expand Up @@ -36,11 +37,12 @@ macro_rules! failure {
}

/// The Csi Node implementation.
#[derive(Clone, Debug)]
#[derive(Clone)]
pub(crate) struct Node {
node_name: String,
node_selector: HashMap<String, String>,
filesystems: Vec<FileSystem>,
vol_client: Option<VolumesClientWrapper>,
}

impl Node {
Expand All @@ -49,11 +51,13 @@ impl Node {
node_name: String,
node_selector: HashMap<String, String>,
filesystems: Vec<FileSystem>,
vol_client: Option<VolumesClientWrapper>,
) -> Node {
let self_ = Self {
node_name,
node_selector,
filesystems,
vol_client,
};
info!("Node topology segments: {:?}", self_.segments());
self_
Expand All @@ -73,6 +77,32 @@ impl Node {
.chain(vec![self.node_name_segment()])
.collect()
}

/// Get the target URI for the given volume.
/// If the REST volume client is enabled, use it to fetch the volume and its target URI.
/// Otherwise, use the publish context.
async fn volume_uri(
&self,
volume_id: &Uuid,
context: &HashMap<String, String>,
) -> Result<String, tonic::Status> {
let ctx_uri = context.get("uri").ok_or_else(|| {
failure!(
Code::InvalidArgument,
"Failed to stage volume {volume_id}: URI attribute missing from publish context"
)
})?;

let Some(client) = &self.vol_client else {
return Ok(ctx_uri.to_string());
};

let uri = client.volume_uri(volume_id).await?;
if &uri != ctx_uri {
tracing::warn!(volume.uri=%uri, ctx.uri=%ctx_uri, "Overriding URI volume context with URI from REST");
}
Ok(uri)
}
}

const ATTACH_TIMEOUT_INTERVAL: Duration = Duration::from_millis(100);
Expand Down Expand Up @@ -670,14 +700,6 @@ impl node_server::Node for Node {
}
};

let uri = msg.publish_context.get("uri").ok_or_else(|| {
failure!(
Code::InvalidArgument,
"Failed to stage volume {}: URI attribute missing from publish context",
&msg.volume_id
)
})?;

let uuid = Uuid::parse_str(&msg.volume_id).map_err(|error| {
failure!(
Code::Internal,
Expand All @@ -688,13 +710,21 @@ impl node_server::Node for Node {
})?;
let _guard = VolumeOpGuard::new(uuid)?;

// We cannot fully rely on the URI stored in the `publish_context` because it may change
// if the target gets moved to another node.
// It's possible that a pod gets restaged on the same node, and thus skipping the controller
// publish call, leaving us with a bad URI:
// https://github.com/openebs/mayastor/issues/1781
// In order to fix this issue we need to fetch the volume uri from the control-plane.
let uri = self.volume_uri(&uuid, &msg.publish_context).await?;

// Note checking existence of staging_target_path, is delegated to
// code handling those volume types where it is relevant.

// All checks complete, now attach, if not attached already.
debug!("Volume {} has URI {}", &msg.volume_id, uri);

let mut device = Device::parse(uri).map_err(|error| {
let mut device = Device::parse(&uri).map_err(|error| {
failure!(
Code::Internal,
"Failed to stage volume {}: error parsing URI {}: {}",
Expand Down
Loading

0 comments on commit 15c9918

Please sign in to comment.