Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(csi/stage): fetch uri via REST #903

Merged
merged 5 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ repos:
- id: rust-lint
name: Rust lint
description: Run cargo clippy on files included in the commit. clippy should be installed before-hand.
entry: nix-shell --pure --run 'cargo-clippy --all --all-targets -- -D warnings'
entry: nix-shell --pure --run './scripts/rust/linter.sh clippy'
pass_filenames: false
types: [file, rust]
language: system
- id: rust-style
name: Rust style
description: Run cargo fmt on files included in the commit. rustfmt should be installed before-hand.
entry: nix-shell --pure --run 'cargo-fmt --all -- --check'
entry: nix-shell --pure --run './scripts/rust/linter.sh fmt'
exclude: openapi/
pass_filenames: false
types: [file, rust]
Expand Down
3 changes: 1 addition & 2 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ pipeline {
steps {
sh 'printenv'
sh 'nix-shell --run "./scripts/rust/generate-openapi-bindings.sh"'
sh 'nix-shell --run "cargo fmt --all -- --check"'
sh 'nix-shell --run "cargo clippy --all-targets -- -D warnings"'
sh 'nix-shell --run "./scripts/rust/linter.sh"'
sh 'nix-shell --run "black --check tests/bdd"'
sh 'nix-shell --run "./scripts/git/check-submodule-branches.sh"'
}
Expand Down
3 changes: 1 addition & 2 deletions control-plane/csi-driver/src/bin/controller/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use rpc::csi::{controller_server::ControllerServer, identity_server::IdentitySer

use std::{fs, io::ErrorKind, ops::Add};
use tokio::net::UnixListener;
use tonic::codegen::tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::Server;
use tonic::{codegen::tokio_stream::wrappers::UnixListenerStream, transport::Server};
use tracing::{debug, error, info};

pub(super) struct CsiServer {}
Expand Down
81 changes: 68 additions & 13 deletions control-plane/csi-driver/src/bin/node/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ use stor_port::types::v0::openapi::{
models::{RegisterAppNode, RestJsonError},
};

use anyhow::{anyhow, Result};
use anyhow::anyhow;
use std::{collections::HashMap, sync::Arc, time::Duration};
use stor_port::types::v0::openapi::apis::app_nodes_api::tower::client::direct::AppNodes;
use stor_port::types::v0::openapi::{
apis::{
app_nodes_api::tower::client::direct::AppNodes,
volumes_api::tower::client::{direct::Volumes, VolumesClient},
},
models::NexusState,
};
use tonic::Status;
use tracing::info;

Expand Down Expand Up @@ -92,9 +98,6 @@ impl From<clients::tower::Error<RestJsonError>> for ApiClientError {
}
}

/// Default rest api timeout for requests.
const DEFAULT_TIMEOUT_FOR_REST_REQUESTS: Duration = Duration::from_secs(5);

/// Wrapper for AppNodes REST API client.
pub(crate) struct AppNodesClientWrapper {
client: AppNodesClient,
Expand All @@ -105,26 +108,24 @@ impl AppNodesClientWrapper {
pub(crate) fn initialize(
endpoint: Option<&String>,
) -> anyhow::Result<Option<AppNodesClientWrapper>> {
const REST_TIMEOUT: Duration = Duration::from_secs(5);

let Some(endpoint) = endpoint else {
return Ok(None);
};

let url = clients::tower::Url::parse(endpoint)
.map_err(|error| anyhow!("Invalid API endpoint URL {}: {:?}", endpoint, error))?;
.map_err(|error| anyhow!("Invalid API endpoint URL {endpoint}: {error:?}"))?;

let tower = clients::tower::Configuration::builder()
.with_timeout(DEFAULT_TIMEOUT_FOR_REST_REQUESTS)
.with_timeout(REST_TIMEOUT)
.build_url(url)
.map_err(|error| {
anyhow::anyhow!(
"Failed to create openapi configuration, Error: '{:?}'",
error
)
anyhow::anyhow!("Failed to create openapi configuration, Error: '{error:?}'")
})?;

info!(
"API client is initialized with endpoint {}, request timeout = {:?}",
endpoint, DEFAULT_TIMEOUT_FOR_REST_REQUESTS,
"API client is initialized with endpoint {endpoint}, request timeout = {REST_TIMEOUT:?}"
);

Ok(Some(Self {
Expand Down Expand Up @@ -159,3 +160,57 @@ impl AppNodesClientWrapper {
Ok(())
}
}

/// Wrapper for Volumes REST API client.
#[derive(Clone)]
pub(crate) struct VolumesClientWrapper {
client: VolumesClient,
}

impl VolumesClientWrapper {
/// Initialize VolumesClientWrapper instance.
pub(crate) fn new(endpoint: &str) -> anyhow::Result<Self> {
/// TODO: what's the NodeStage timeout?
const REST_TIMEOUT: Duration = Duration::from_secs(10);

let url = clients::tower::Url::parse(endpoint)
.map_err(|error| anyhow!("Invalid API endpoint URL {endpoint}: {error:?}"))?;

let config = clients::tower::Configuration::builder()
.with_timeout(REST_TIMEOUT)
.with_concurrency_limit(Some(10))
.build_url(url)
.map_err(|error| {
anyhow::anyhow!("Failed to create openapi configuration, Error: '{error:?}'")
})?;

info!(
"VolumesClient API is initialized with endpoint {endpoint}, request timeout = {REST_TIMEOUT:?}"
);

Ok(Self {
client: VolumesClient::new(Arc::new(config)),
})
}

/// Get the target URI for the given volume.
pub(crate) async fn volume_uri(
&self,
volume_id: &uuid::Uuid,
) -> Result<String, ApiClientError> {
let volume: stor_port::types::v0::openapi::models::Volume =
self.client.get_volume(volume_id).await?;
let Some(target) = volume.state.target else {
return Err(ApiClientError::Unavailable(
"Volume target is not available".into(),
));
};
if !matches!(target.state, NexusState::Online | NexusState::Degraded) {
return Err(ApiClientError::Unavailable(
"Volume target is not ready for I/O".into(),
));
}
// TODO: check for other volume statuses, example ONLINE?
Ok(target.device_uri)
}
}
38 changes: 29 additions & 9 deletions control-plane/csi-driver/src/bin/node/main_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use grpc::csi_node_nvme::nvme_operations_server::NvmeOperationsServer;
use stor_port::platform;
use utils::tracing_telemetry::{FmtLayer, FmtStyle};

use crate::client::AppNodesClientWrapper;
use crate::client::{AppNodesClientWrapper, VolumesClientWrapper};
use clap::Arg;
use serde_json::json;
use std::{
Expand All @@ -32,8 +32,7 @@ use std::{
str::FromStr,
};
use tokio::net::UnixListener;
use tonic::codegen::tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::Server;
use tonic::{codegen::tokio_stream::wrappers::UnixListenerStream, transport::Server};
use tracing::{debug, error, info};

const GRPC_PORT: u16 = 50051;
Expand All @@ -58,6 +57,14 @@ pub(super) async fn main() -> anyhow::Result<()> {
.requires("rest-endpoint")
.help("Enable registration of the csi node with the control plane")
)
.arg(
Arg::new("enable-rest")
.long("enable-rest")
.action(clap::ArgAction::SetTrue)
.value_name("BOOLEAN")
.requires("rest-endpoint")
.help("Enhance csi with added rest functionality")
)
tiagolobocastro marked this conversation as resolved.
Show resolved Hide resolved
.arg(
Arg::new("csi-socket")
.short('c')
Expand Down Expand Up @@ -322,10 +329,10 @@ pub(super) async fn main() -> anyhow::Result<()> {
.unwrap_or("/var/tmp/csi.sock");
// Remove stale CSI socket from previous instance if there is any.
match fs::remove_file(csi_socket) {
Ok(_) => info!("Removed stale CSI socket {}", csi_socket),
Ok(_) => info!("Removed stale CSI socket {csi_socket}"),
Err(err) => {
if err.kind() != ErrorKind::NotFound {
anyhow::bail!("Error removing stale CSI socket {}: {}", csi_socket, err);
anyhow::bail!("Error removing stale CSI socket {csi_socket}: {err}");
}
}
}
Expand Down Expand Up @@ -371,22 +378,35 @@ impl CsiServer {

let incoming = {
let uds = UnixListener::bind(csi_socket).unwrap();
info!("CSI plugin bound to {}", csi_socket);
info!("CSI plugin bound to {csi_socket}");

// Change permissions on CSI socket to allow non-privileged clients to access it
// to simplify testing.
if let Err(e) = fs::set_permissions(
if let Err(error) = fs::set_permissions(
csi_socket,
std::os::unix::fs::PermissionsExt::from_mode(0o777),
) {
error!("Failed to change permissions for CSI socket: {:?}", e);
error!("Failed to change permissions for CSI socket: {error:?}");
} else {
debug!("Successfully changed file permissions for CSI socket");
}
UnixListenerStream::new(uds)
};

let node = Node::new(node_name.into(), node_selector, probe_filesystems());
let vol_client = match cli_args.get_one::<String>("rest-endpoint") {
Some(ep) if cli_args.get_flag("enable-rest") => Some(VolumesClientWrapper::new(ep)?),
_ => {
tracing::warn!("The rest client is not enabled - functionality may be limited");
None
}
};

let node = Node::new(
node_name.into(),
node_selector,
probe_filesystems(),
vol_client,
);
Ok(async move {
Server::builder()
.add_service(NodeServer::new(node))
Expand Down
3 changes: 1 addition & 2 deletions control-plane/csi-driver/src/bin/node/mount.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Utility functions for mounting and unmounting filesystems.
use crate::filesystem_ops::FileSystem;
use crate::runtime;
use crate::{filesystem_ops::FileSystem, runtime};
use csi_driver::filesystem::FileSystem as Fs;
use devinfo::mountinfo::{MountInfo, SafeMountIter};

Expand Down
50 changes: 40 additions & 10 deletions control-plane/csi-driver/src/bin/node/node.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
block_vol::{publish_block_volume, unpublish_block_volume},
client::VolumesClientWrapper,
dev::{sysfs_dev_size, Device},
filesystem_ops::FileSystem,
filesystem_vol::{publish_fs_volume, stage_fs_volume, unpublish_fs_volume, unstage_fs_volume},
Expand Down Expand Up @@ -36,11 +37,12 @@ macro_rules! failure {
}

/// The Csi Node implementation.
#[derive(Clone, Debug)]
#[derive(Clone)]
pub(crate) struct Node {
node_name: String,
node_selector: HashMap<String, String>,
filesystems: Vec<FileSystem>,
vol_client: Option<VolumesClientWrapper>,
}

impl Node {
Expand All @@ -49,11 +51,13 @@ impl Node {
node_name: String,
node_selector: HashMap<String, String>,
filesystems: Vec<FileSystem>,
vol_client: Option<VolumesClientWrapper>,
) -> Node {
let self_ = Self {
node_name,
node_selector,
filesystems,
vol_client,
};
info!("Node topology segments: {:?}", self_.segments());
self_
Expand All @@ -73,6 +77,32 @@ impl Node {
.chain(vec![self.node_name_segment()])
.collect()
}

/// Get the target URI for the given volume.
/// If the REST volume client is enabled, use it to fetch the volume and its target URI.
/// Otherwise, use the publish context.
async fn volume_uri(
&self,
volume_id: &Uuid,
context: &HashMap<String, String>,
) -> Result<String, tonic::Status> {
let ctx_uri = context.get("uri").ok_or_else(|| {
failure!(
Code::InvalidArgument,
"Failed to stage volume {volume_id}: URI attribute missing from publish context"
)
})?;

let Some(client) = &self.vol_client else {
return Ok(ctx_uri.to_string());
};

let uri = client.volume_uri(volume_id).await?;
if &uri != ctx_uri {
tracing::warn!(volume.uri=%uri, ctx.uri=%ctx_uri, "Overriding URI volume context with URI from REST");
}
Ok(uri)
tiagolobocastro marked this conversation as resolved.
Show resolved Hide resolved
}
}

const ATTACH_TIMEOUT_INTERVAL: Duration = Duration::from_millis(100);
Expand Down Expand Up @@ -670,14 +700,6 @@ impl node_server::Node for Node {
}
};

let uri = msg.publish_context.get("uri").ok_or_else(|| {
failure!(
Code::InvalidArgument,
"Failed to stage volume {}: URI attribute missing from publish context",
&msg.volume_id
)
})?;

let uuid = Uuid::parse_str(&msg.volume_id).map_err(|error| {
failure!(
Code::Internal,
Expand All @@ -688,13 +710,21 @@ impl node_server::Node for Node {
})?;
let _guard = VolumeOpGuard::new(uuid)?;

// We cannot fully rely on the URI stored in the `publish_context` because it may change
// if the target gets moved to another node.
// It's possible that a pod gets restaged on the same node, and thus skipping the controller
// publish call, leaving us with a bad URI:
// https://github.com/openebs/mayastor/issues/1781
// In order to fix this issue we need to fetch the volume uri from the control-plane.
let uri = self.volume_uri(&uuid, &msg.publish_context).await?;

// Note checking existence of staging_target_path, is delegated to
// code handling those volume types where it is relevant.

// All checks complete, now attach, if not attached already.
debug!("Volume {} has URI {}", &msg.volume_id, uri);

let mut device = Device::parse(uri).map_err(|error| {
let mut device = Device::parse(&uri).map_err(|error| {
failure!(
Code::Internal,
"Failed to stage volume {}: error parsing URI {}: {}",
Expand Down
Loading
Loading