diff --git a/control-plane/csi-driver/src/bin/controller/controller.rs b/control-plane/csi-driver/src/bin/controller/controller.rs index 7f74adcd6..df6c6f13e 100644 --- a/control-plane/csi-driver/src/bin/controller/controller.rs +++ b/control-plane/csi-driver/src/bin/controller/controller.rs @@ -254,9 +254,25 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { request: tonic::Request, ) -> Result, tonic::Status> { let args = request.into_inner(); - tracing::trace!(request = ?args); + let _permit = self.create_volume_permit().await?; + // k8s uses names pvc-{uuid} and we use uuid further as ID in SPDK so we + // must require it. + let re = Regex::new(VOLUME_NAME_PATTERN).unwrap(); + let volume_uuid = match re.captures(&args.name) { + Some(captures) => captures.get(1).unwrap().as_str().to_string(), + None => { + return Err(Status::invalid_argument(format!( + "Expected the volume name in pvc- format: {}", + args.name + ))) + } + }; + tracing::Span::current().record("volume.uuid", volume_uuid.as_str()); + + let req = csi_driver::trace::CsiRequest::new_info("Create Volume"); + let volume_content_source = if let Some(source) = &args.volume_content_source { match &source.r#type { Some(Type::Snapshot(snapshot_source)) => { @@ -284,20 +300,6 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { None }; - // k8s uses names pvc-{uuid} and we use uuid further as ID in SPDK so we - // must require it. - let re = Regex::new(VOLUME_NAME_PATTERN).unwrap(); - let volume_uuid = match re.captures(&args.name) { - Some(captures) => captures.get(1).unwrap().as_str().to_string(), - None => { - return Err(Status::invalid_argument(format!( - "Expected the volume name in pvc- format: {}", - args.name - ))) - } - }; - tracing::Span::current().record("volume.uuid", volume_uuid.as_str()); - check_volume_capabilities(&args.volume_capabilities)?; // Check volume size. @@ -342,8 +344,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { Ok(volume) => { check_existing_volume(&volume, replica_count, size, thin)?; debug!( - "Volume {} already exists and is compatible with requested config", - volume_uuid + "Volume {volume_uuid} already exists and is compatible with requested config" ); } // If the volume doesn't exist, create it. @@ -412,6 +413,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { accessible_topology: vt_mapper.volume_accessible_topology(), }; + req.info_ok(); Ok(Response::new(CreateVolumeResponse { volume: Some(volume), })) @@ -422,7 +424,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { request: tonic::Request, ) -> Result, tonic::Status> { let args = request.into_inner(); - tracing::trace!(volume.uuid = %args.volume_id, request = ?args); + let req = csi_driver::trace::CsiRequest::new_info("Delete Volume"); let volume_uuid = Uuid::parse_str(&args.volume_id).map_err(|_e| { Status::invalid_argument(format!("Malformed volume UUID: {}", args.volume_id)) })?; @@ -430,13 +432,14 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { RestApiClient::get_client() .delete_volume(&volume_uuid) .await - .map_err(|e| { + .map_err(|error| { Status::internal(format!( - "Failed to delete volume {}, error = {:?}", - args.volume_id, e + "Failed to delete volume {}, error = {error:?}", + args.volume_id )) })?; + req.info_ok(); Ok(Response::new(DeleteVolumeResponse {})) } @@ -446,7 +449,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { request: tonic::Request, ) -> Result, tonic::Status> { let args = request.into_inner(); - tracing::trace!(volume.uuid = %args.volume_id, request = ?args); + let req = csi_driver::trace::CsiRequest::new_info("Publish Volume"); if args.readonly { return Err(Status::invalid_argument( "Read-only volumes are not supported", @@ -489,8 +492,8 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { Some(target) => { if target.protocol != Some(protocol) { let m = format!( - "Volume {} already shared via different protocol: {:?}", - volume_id, target.protocol, + "Volume {volume_id} already shared via different protocol: {:?}", + target.protocol, ); error!("{}", m); return Err(Status::failed_precondition(m)); @@ -548,10 +551,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { .await?; if let Some((node, uri)) = get_volume_share_location(&v) { - debug!( - "Volume {} successfully published on node {} via {}", - volume_id, node, uri - ); + debug!("Volume {volume_id} successfully published on node {node} via {uri}"); uri } else { let m = format!( @@ -565,22 +565,18 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { publish_context.insert("uri".to_string(), uri); - debug!( - "Publish context for volume {}: {:?}", - volume_id, publish_context - ); + tracing::info!(?publish_context, "{}", req.log_str()); Ok(Response::new(ControllerPublishVolumeResponse { publish_context, })) } - #[instrument(err, fields(volume.uuid = %request.get_ref().volume_id), skip(self))] async fn controller_unpublish_volume( &self, request: tonic::Request, ) -> Result, tonic::Status> { let args = request.into_inner(); - tracing::trace!(volume.uuid = %args.volume_id, request = ?args); + let req = csi_driver::trace::CsiRequest::new_info("Unpublish Volume"); let volume_uuid = Uuid::parse_str(&args.volume_id).map_err(|_e| { Status::invalid_argument(format!("Malformed volume UUID: {}", args.volume_id)) @@ -619,7 +615,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { )), })?; - debug!("Volume {} successfully unpublished", args.volume_id); + req.info_ok(); Ok(Response::new(ControllerUnpublishVolumeResponse {})) } @@ -629,9 +625,8 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { request: tonic::Request, ) -> Result, tonic::Status> { let args = request.into_inner(); - tracing::trace!(volume.uuid = %args.volume_id, request = ?args); + let _ = csi_driver::trace::CsiRequest::new_dbg("Validate Volume Capabilities"); - debug!("Request to validate volume capabilities: {:?}", args); let volume_uuid = Uuid::parse_str(&args.volume_id).map_err(|_e| { Status::invalid_argument(format!("Malformed volume UUID: {}", args.volume_id)) })?; @@ -681,8 +676,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { request: tonic::Request, ) -> Result, tonic::Status> { let args = request.into_inner(); - tracing::trace!(request = ?args); - + let req = csi_driver::trace::CsiRequest::new_trace("List Volumes"); let max_entries = args.max_entries; if max_entries < 0 { return Err(Status::invalid_argument("max_entries can't be negative")); @@ -714,8 +708,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { }) .collect(); - debug!("Available k8s volumes: {:?}", entries); - + tracing::trace!(?entries, "{}", req.log_str()); Ok(Response::new(ListVolumesResponse { entries, next_token: volumes.next_token.map_or("".to_string(), |v| v.to_string()), @@ -728,7 +721,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { request: tonic::Request, ) -> Result, tonic::Status> { let args = request.into_inner(); - tracing::trace!(request = ?args); + let _ = csi_driver::trace::CsiRequest::new_trace("Get Node(s) Capacity"); // Check capabilities. check_volume_capabilities(&args.volume_capabilities)?; @@ -741,7 +734,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { }; let pools: Vec = if let Some(node) = node { - debug!("Calculating pool capacity for node {}", node); + debug!("Calculating pool capacity for node {node}"); RestApiClient::get_client() .get_node_pools(node) .await @@ -755,9 +748,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { RestApiClient::get_client() .list_pools() .await - .map_err(|e| { - Status::internal(format!("Failed to list all pools, error = {e:?}",)) - })? + .map_err(|e| Status::internal(format!("Failed to list all pools, error = {e:?}")))? }; let available_capacity: i64 = pools.into_iter().fold(0, |acc, p| match p.state { @@ -814,7 +805,6 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { request: tonic::Request, ) -> Result, tonic::Status> { let request = request.into_inner(); - tracing::trace!(volume.uuid = %request.source_volume_id, snapshot.uuid = %request.name, ?request); let volume_uuid = Uuid::parse_str(&request.source_volume_id).map_err(|_e| { Status::invalid_argument(format!( @@ -839,6 +829,8 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { let snap_uuid = Uuid::parse_str(&snapshot_uuid_str).map_err(|_e| { Status::invalid_argument(format!("Malformed snapshot ID: {}", request.name)) })?; + + let req = csi_driver::trace::CsiRequest::new_info("Create Snapshot"); let create_params = CreateSnapshotParams::try_from(&request.parameters)?; // Get the volume object. Extract the app node endpoint if quiesce is requested. @@ -915,6 +907,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { snapshot_creation_result }?; + req.info_ok(); Ok(tonic::Response::new(CreateSnapshotResponse { snapshot: Some(snapshot_to_csi(snapshot)), })) @@ -925,8 +918,8 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { &self, request: tonic::Request, ) -> Result, tonic::Status> { + let req = csi_driver::trace::CsiRequest::new_info("Delete Snapshot"); let args = request.into_inner(); - tracing::trace!(snapshot.uuid = %args.snapshot_id, ?args); let snapshot_uuid = Uuid::parse_str(&args.snapshot_id).map_err(|_e| { Status::invalid_argument(format!("Malformed snapshot UUID: {}", args.snapshot_id)) @@ -942,6 +935,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { )) })?; + req.info_ok(); Ok(Response::new(DeleteSnapshotResponse {})) } @@ -985,13 +979,13 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { })) } - #[instrument(err, skip(self))] + #[instrument(err, fields(volume.uuid = request.get_ref().volume_id), skip(self))] async fn controller_expand_volume( &self, request: Request, ) -> Result, Status> { let args = request.into_inner(); - trace!(volume.uuid = %args.volume_id, request = ?args); + let req = csi_driver::trace::CsiRequest::new_info("Expand Volume"); let vol_uuid = Uuid::parse_str(&args.volume_id).map_err(|error| { Status::invalid_argument(format!( @@ -1039,10 +1033,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc { error => Status::from(error), })?; - debug!( - size_bytes = requested_size, - "Expansion succeeded for volume {vol_uuid}" - ); + tracing::info!(size_bytes = requested_size, "{}", req.log_str()); Ok(tonic::Response::new(ControllerExpandVolumeResponse { capacity_bytes: vol.spec.size as i64, node_expansion_required, diff --git a/control-plane/csi-driver/src/bin/node/node.rs b/control-plane/csi-driver/src/bin/node/node.rs index 029cc11db..3f333ed5e 100644 --- a/control-plane/csi-driver/src/bin/node/node.rs +++ b/control-plane/csi-driver/src/bin/node/node.rs @@ -27,7 +27,7 @@ use rpc::{ use nix::{errno::Errno, sys}; use std::{collections::HashMap, path::Path, time::Duration, vec::Vec}; use tonic::{Code, Request, Response, Status}; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info, instrument}; use uuid::Uuid; macro_rules! failure { @@ -240,13 +240,14 @@ impl node_server::Node for Node { /// and/or other arguments if the volume has MULTI_NODE capability (i.e., /// access_mode is either MULTI_NODE_READER_ONLY, MULTI_NODE_SINGLE_WRITER /// or MULTI_NODE_MULTI_WRITER). + #[instrument(err, fields(volume.uuid = request.get_ref().volume_id), skip(self))] async fn node_publish_volume( &self, request: Request, ) -> Result, Status> { let msg = request.into_inner(); - trace!("node_publish_volume {:?}", msg); + let req = csi_driver::trace::CsiRequest::new_info("Node Publish Volume"); if msg.volume_id.is_empty() { return Err(failure!( @@ -311,6 +312,8 @@ impl node_server::Node for Node { publish_block_volume(&msg).await?; } } + + req.info_ok(); Ok(Response::new(NodePublishVolumeResponse {})) } @@ -322,13 +325,14 @@ impl node_server::Node for Node { /// given node and returns a success. /// /// This operation MUST be idempotent. + #[instrument(err, fields(volume.uuid = request.get_ref().volume_id), skip(self))] async fn node_unpublish_volume( &self, request: Request, ) -> Result, Status> { let msg = request.into_inner(); - trace!("node_unpublish_volume {:?}", msg); + let req = csi_driver::trace::CsiRequest::new_info("Node Unpublish Volume"); if msg.volume_id.is_empty() { return Err(failure!( @@ -371,16 +375,19 @@ impl node_server::Node for Node { unpublish_block_volume(&msg).await?; } } + + req.info_ok(); Ok(Response::new(NodeUnpublishVolumeResponse {})) } /// Get volume stats method evaluates and returns capacity metrics. + #[instrument(err, fields(volume.uuid = request.get_ref().volume_id), skip(self))] async fn node_get_volume_stats( &self, request: Request, ) -> Result, Status> { let msg = request.into_inner(); - trace!("node_get_volume_stats {:?}", msg); + let req = csi_driver::trace::CsiRequest::new_trace("Node Get Volume Stats"); if msg.volume_id.is_empty() { return Err(failure!( Code::InvalidArgument, @@ -397,28 +404,30 @@ impl node_server::Node for Node { let volume_path = Path::new(&msg.volume_path); if volume_path.exists() { - // Check if its a filesystem. + // Check if it's a filesystem. if volume_path.is_dir() { - trace!("Getting statfs metrics for : {:?}", volume_path); match sys::statfs::statfs(&*msg.volume_path) { - Ok(info) => Ok(Response::new(NodeGetVolumeStatsResponse { - usage: vec![ - csi::VolumeUsage { - total: info.blocks() as i64 * info.block_size(), - unit: csi::volume_usage::Unit::Bytes as i32, - available: info.blocks_available() as i64 * info.block_size(), - used: (info.blocks() - info.blocks_free()) as i64 - * info.block_size(), - }, - csi::VolumeUsage { - total: info.files() as i64, - unit: csi::volume_usage::Unit::Inodes as i32, - available: info.files_free() as i64, - used: (info.files() - info.files_free()) as i64, - }, - ], - volume_condition: None, - })), + Ok(info) => { + req.trace_ok(); + Ok(Response::new(NodeGetVolumeStatsResponse { + usage: vec![ + csi::VolumeUsage { + total: info.blocks() as i64 * info.block_size(), + unit: csi::volume_usage::Unit::Bytes as i32, + available: info.blocks_available() as i64 * info.block_size(), + used: (info.blocks() - info.blocks_free()) as i64 + * info.block_size(), + }, + csi::VolumeUsage { + total: info.files() as i64, + unit: csi::volume_usage::Unit::Inodes as i32, + available: info.files_free() as i64, + used: (info.files() - info.files_free()) as i64, + }, + ], + volume_condition: None, + })) + } Err(err) => match err { Errno::ENOENT => Err(Status::new(Code::NotFound, err.to_string())), Errno::EIO => Err(Status::new(Code::Internal, err.to_string())), @@ -438,12 +447,13 @@ impl node_server::Node for Node { } } + #[instrument(err, fields(volume.uuid = request.get_ref().volume_id), skip(self))] async fn node_expand_volume( &self, request: Request, ) -> Result, Status> { let args = request.into_inner(); - trace!(volume.uuid = %args.volume_id, request = ?args); + let req = csi_driver::trace::CsiRequest::new_info("Node Expand Volume"); //===============================CsiAccessType============================================= // A type alias for better readability, and also easier conversions @@ -607,20 +617,18 @@ impl node_server::Node for Node { .await .map_err(|err| failure!(Code::Internal, "{}", err))?; - debug!( - size_bytes = required_bytes, - "Expansion succeeded for volume {vol_uuid}" - ); + tracing::info!(size_bytes = required_bytes, "{}", req.log_str()); success_result } + #[instrument(err, fields(volume.uuid = request.get_ref().volume_id), skip(self))] async fn node_stage_volume( &self, request: Request, ) -> Result, Status> { let msg = request.into_inner(); - trace!("node_stage_volume {:?}", msg); + let req = csi_driver::trace::CsiRequest::new_info("Node Stage Volume"); if msg.volume_id.is_empty() { return Err(failure!( @@ -781,16 +789,18 @@ impl node_server::Node for Node { } } + req.info_ok(); Ok(Response::new(NodeStageVolumeResponse {})) } + #[instrument(err, fields(volume.uuid = request.get_ref().volume_id), skip(self))] async fn node_unstage_volume( &self, request: Request, ) -> Result, Status> { let msg = request.into_inner(); - trace!("node_unstage_volume {:?}", msg); + let req = csi_driver::trace::CsiRequest::new_info("Node Unstage Volume"); if msg.volume_id.is_empty() { return Err(failure!( @@ -841,7 +851,8 @@ impl node_server::Node for Node { format!("Failed to unstage volume {}:", &msg.volume_id), ) .await?; - info!("Volume {} unstaged", &msg.volume_id); + + req.info_ok(); Ok(Response::new(NodeUnstageVolumeResponse {})) } } diff --git a/control-plane/csi-driver/src/lib.rs b/control-plane/csi-driver/src/lib.rs index 9b7476a92..6bc211bfd 100644 --- a/control-plane/csi-driver/src/lib.rs +++ b/control-plane/csi-driver/src/lib.rs @@ -71,3 +71,5 @@ pub mod limiter; /// Contains tools to advertise the same set of capabilities across different /// CSI microservices. pub mod plugin_capabilities; +/// Request Tracing. +pub mod trace; diff --git a/control-plane/csi-driver/src/trace.rs b/control-plane/csi-driver/src/trace.rs new file mode 100644 index 000000000..3a631fc4b --- /dev/null +++ b/control-plane/csi-driver/src/trace.rs @@ -0,0 +1,51 @@ +/// A csi request which can be traced. +pub struct CsiRequest<'a> { + request: &'a str, + start: std::time::Instant, +} + +impl<'a> CsiRequest<'a> { + /// Add new request and info trace it. + pub fn new_info(request: &'a str) -> Self { + tracing::info!("[ CSI ] {request} Request started"); + + Self::new(request) + } + /// Add new request and debug trace it. + pub fn new_dbg(request: &'a str) -> Self { + tracing::debug!("[ CSI ] {request} Request started"); + + Self::new(request) + } + /// Add new request and trace trace it. + pub fn new_trace(request: &'a str) -> Self { + tracing::trace!("[ CSI ] {request} Request started"); + + Self::new(request) + } + + fn new(request: &'a str) -> Self { + Self { + request, + start: std::time::Instant::now(), + } + } + + /// Get completion log message. + pub fn log_str(self) -> String { + format!( + "[ CSI ] {} Request completed successfully after {:?}", + self.request, + self.start.elapsed() + ) + } + + /// Log completion info. + pub fn info_ok(self) { + tracing::info!("{}", self.log_str()) + } + /// Log completion trace. + pub fn trace_ok(self) { + tracing::info!("{}", self.log_str()) + } +} diff --git a/scripts/python/test-residue-cleanup.sh b/scripts/python/test-residue-cleanup.sh index be97ad91f..52f7bc65b 100755 --- a/scripts/python/test-residue-cleanup.sh +++ b/scripts/python/test-residue-cleanup.sh @@ -1,5 +1,10 @@ #!/usr/bin/env bash +SCRIPT_DIR="$(dirname "$0")" + +# Cleans up the deployer's leftovers +"$SCRIPT_DIR"/../rust/deployer-cleanup.sh || true + # Cleans up the iptables rules added by bdd tests set -euo pipefail @@ -13,4 +18,4 @@ sed -i '/.*--comment.* "added by bdd tests"/d' iptables.backup # Restore the rules sudo iptables-restore < iptables.backup # Remove the temporary backup file -sudo rm iptables.backup +sudo rm iptables.backup \ No newline at end of file diff --git a/scripts/python/test.sh b/scripts/python/test.sh index a953ad4bb..02c037de2 100755 --- a/scripts/python/test.sh +++ b/scripts/python/test.sh @@ -19,7 +19,6 @@ REPORT="$ROOT_DIR/report.xml" cleanup() { "$SCRIPT_DIR"/test-residue-cleanup.sh || true - "$SCRIPT_DIR"/../rust/deployer-cleanup.sh || true } cleanup_handler() {