Skip to content

Commit

Permalink
chore(bors): merge pull request #889
Browse files Browse the repository at this point in the history
889: fix(volume): don't allow unpublish from non-frontend node r=dsharma-dc a=dsharma-dc

In case application and volume target have already moved to a different node and there is a lagging ControllerUnpublish request that comes later, then this unpublish call can delete the newly created target, which is undesirable and leads to volume staging failures.

Co-authored-by: Diwakar Sharma <[email protected]>
  • Loading branch information
mayastor-bors and dsharma-dc committed Dec 4, 2024
2 parents 4d13dbf + 5a4695d commit 1bdbfcb
Show file tree
Hide file tree
Showing 17 changed files with 239 additions and 29 deletions.
5 changes: 4 additions & 1 deletion control-plane/agents/src/bin/core/tests/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ async fn events() {
let vol_client = cluster.grpc_client().volume();

vol_client
.unpublish(&UnpublishVolume::new(&volid, false), None)
.unpublish(
&UnpublishVolume::new(&volid, false, vec![cluster.csi_node(0).into()]),
None,
)
.await
.unwrap();

Expand Down
6 changes: 5 additions & 1 deletion control-plane/agents/src/bin/core/tests/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,11 @@ async fn disown_unused_replicas() {

cluster.composer().pause(&node).await.unwrap();
volumes_api
.del_volume_target(&volume.spec.uuid, Some(false))
.del_volume_target(
&volume.spec.uuid,
Some(false),
Some(cluster.csi_node(0).as_str()),
)
.await
.expect_err("io-engine is down");
cluster.composer().kill(&node).await.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,11 @@ async fn unused_reconcile(cluster: &Cluster) {
cluster.composer().kill(&nexus_node.id).await.unwrap();
// 2. now we force unpublish the volume
volumes_api
.del_volume_target(&volume.spec.uuid, Some(true))
.del_volume_target(
&volume.spec.uuid,
Some(true),
Some(cluster.csi_node(0).as_str()),
)
.await
.unwrap();
// 3. publish on the previously unused node
Expand Down
40 changes: 29 additions & 11 deletions control-plane/agents/src/bin/core/tests/volume/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,10 @@ async fn nexus_persistence_test_iteration(
let nexus_uuid = nexus.uuid.clone();

volume_client
.unpublish(&UnpublishVolume::new(&volume_state.uuid, false), None)
.unpublish(
&UnpublishVolume::new(&volume_state.uuid, false, vec![]),
None,
)
.await
.unwrap();

Expand Down Expand Up @@ -379,7 +382,10 @@ async fn publishing_test(cluster: &Cluster) {
.expect_err("The Volume cannot be published again because it's already published");

volume_client
.unpublish(&UnpublishVolume::new(&volume_state.uuid, false), None)
.unpublish(
&UnpublishVolume::new(&volume_state.uuid, false, vec![]),
None,
)
.await
.unwrap();

Expand Down Expand Up @@ -447,7 +453,10 @@ async fn publishing_test(cluster: &Cluster) {
.expect_err("The volume is already published");

volume_client
.unpublish(&UnpublishVolume::new(&volume_state.uuid, false), None)
.unpublish(
&UnpublishVolume::new(&volume_state.uuid, false, vec![]),
None,
)
.await
.unwrap();

Expand Down Expand Up @@ -491,12 +500,18 @@ async fn publishing_test(cluster: &Cluster) {
cluster.composer().kill(target_node.as_str()).await.unwrap();

volume_client
.unpublish(&UnpublishVolume::new(&volume_state.uuid, false), None)
.unpublish(
&UnpublishVolume::new(&volume_state.uuid, false, vec![]),
None,
)
.await
.expect_err("The node is not online...");

volume_client
.unpublish(&UnpublishVolume::new(&volume_state.uuid, true), None)
.unpublish(
&UnpublishVolume::new(&volume_state.uuid, true, vec![]),
None,
)
.await
.expect("With force comes great responsibility...");

Expand Down Expand Up @@ -747,7 +762,10 @@ async fn replica_count_test(cluster: &Cluster) {

let volume_state = volume.state();
volume_client
.unpublish(&UnpublishVolume::new(&volume_state.uuid, false), None)
.unpublish(
&UnpublishVolume::new(&volume_state.uuid, false, vec![]),
None,
)
.await
.unwrap();

Expand Down Expand Up @@ -927,7 +945,7 @@ async fn publish_unpublish(cluster: &Cluster) {
// Unpublish the volume2
let _ = volume_client
.unpublish(
&UnpublishVolume::new(&VolumeId::try_from(VOLUME_2).unwrap(), false),
&UnpublishVolume::new(&VolumeId::try_from(VOLUME_2).unwrap(), false, vec![]),
None,
)
.await
Expand All @@ -936,7 +954,7 @@ async fn publish_unpublish(cluster: &Cluster) {
// Unpublish the volume1
let _ = volume_client
.unpublish(
&UnpublishVolume::new(&VolumeId::try_from(VOLUME_1).unwrap(), false),
&UnpublishVolume::new(&VolumeId::try_from(VOLUME_1).unwrap(), false, vec![]),
None,
)
.await
Expand Down Expand Up @@ -983,14 +1001,14 @@ async fn target_distribution(cluster: &Cluster) {
// Cleanup
let _ = volume_client
.unpublish(
&UnpublishVolume::new(&VolumeId::try_from(VOLUME_1).unwrap(), false),
&UnpublishVolume::new(&VolumeId::try_from(VOLUME_1).unwrap(), false, vec![]),
None,
)
.await
.expect("The volume should be unpublished");
let _ = volume_client
.unpublish(
&UnpublishVolume::new(&VolumeId::try_from(VOLUME_2).unwrap(), false),
&UnpublishVolume::new(&VolumeId::try_from(VOLUME_2).unwrap(), false, vec![]),
None,
)
.await
Expand Down Expand Up @@ -1063,7 +1081,7 @@ async fn offline_node(cluster: &Cluster) {
// Unpublish volume2
let _ = volume_client
.unpublish(
&UnpublishVolume::new(&VolumeId::try_from(VOLUME_2).unwrap(), false),
&UnpublishVolume::new(&VolumeId::try_from(VOLUME_2).unwrap(), false, vec![]),
None,
)
.await
Expand Down
20 changes: 20 additions & 0 deletions control-plane/agents/src/bin/core/volume/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,26 @@ impl ResourcePublishing for OperationGuardArc<VolumeSpec> {
.await?;

let volume_target = spec_clone.target().expect("already validated");
let frontend_nodes = &request.frontend_nodes;
tracing::debug!("unpublish_volume: frontend nodes {frontend_nodes:?}");
if !frontend_nodes.is_empty() {
if let Some(tgt_cfg) = spec_clone.active_config() {
for unode in frontend_nodes {
if !tgt_cfg.frontend().nodename_allowed(unode.as_str()) {
self.validate_update_step(
registry,
Err(SvcError::FrontendNodeNotAllowed {
node: unode.to_string(),
vol_id: request.uuid.to_string(),
}),
&spec_clone,
)
.await?;
}
}
}
}

let result = match specs.nexus_opt(volume_target.nexus()).await? {
None => Ok(()),
Some(mut nexus) => {
Expand Down
4 changes: 3 additions & 1 deletion control-plane/csi-driver/src/bin/controller/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl From<clients::tower::Error<RestJsonError>> for ApiClientError {
StatusCode::PRECONDITION_FAILED => Self::PreconditionFailed(detailed),
StatusCode::BAD_REQUEST => Self::InvalidArgument(detailed),
StatusCode::NOT_ACCEPTABLE => Self::NotAcceptable(detailed),
StatusCode::UNAUTHORIZED => Self::NotAcceptable(detailed),
status => Self::GenericOperation(status, detailed),
}
}
Expand Down Expand Up @@ -365,11 +366,12 @@ impl RestApiClient {
&self,
volume_id: &uuid::Uuid,
force: bool,
frontend_host: Option<&str>,
) -> Result<(), ApiClientError> {
Self::delete_idempotent(
self.rest_client
.volumes_api()
.del_volume_target(volume_id, Some(force))
.del_volume_target(volume_id, Some(force), frontend_host)
.await,
true,
)?;
Expand Down
13 changes: 8 additions & 5 deletions control-plane/csi-driver/src/bin/controller/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,15 +605,18 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
return Ok(Response::new(ControllerUnpublishVolumeResponse {}));
}

// Do forced volume upublish as Kubernetes already detached the volume.
// Do forced volume unpublish as Kubernetes already detached the volume.
RestApiClient::get_client()
.unpublish_volume(&volume_uuid, true)
.unpublish_volume(&volume_uuid, true, Some(args.node_id.as_str()))
.await
.map_err(|e| {
Status::not_found(format!(
.map_err(|e| match e {
ApiClientError::NotAcceptable(_) => {
Status::ok("Ignoring failure on unpublish due to mismatched host")
}
_ => Status::not_found(format!(
"Failed to unpublish volume {}, error = {:?}",
&args.volume_id, e
))
)),
})?;

debug!("Volume {} successfully unpublished", args.volume_id);
Expand Down
2 changes: 2 additions & 0 deletions control-plane/grpc/proto/v1/volume/volume.proto
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ message UnpublishVolumeRequest {
// the nexus. Note: this option should be used only when we know the node will not become
// accessible again and it is safe to do so.
bool force = 2;
// frontend host requesting for volume unpublish.
repeated string frontend_nodes = 3;
}

// Share Volume request
Expand Down
13 changes: 12 additions & 1 deletion control-plane/grpc/src/operations/volume/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,8 @@ pub trait UnpublishVolumeInfo: Send + Sync + std::fmt::Debug {
fn uuid(&self) -> VolumeId;
/// Force unpublish
fn force(&self) -> bool;
/// Frontend node requesting unpublish.
fn frontend_nodes(&self) -> Vec<String>;
}

impl UnpublishVolumeInfo for UnpublishVolume {
Expand All @@ -1628,6 +1630,10 @@ impl UnpublishVolumeInfo for UnpublishVolume {
fn force(&self) -> bool {
self.force()
}

fn frontend_nodes(&self) -> Vec<String> {
self.frontend_nodes.clone()
}
}

/// Intermediate structure that validates the conversion to UnpublishVolumeRequest type.
Expand All @@ -1644,6 +1650,10 @@ impl UnpublishVolumeInfo for ValidatedUnpublishVolumeRequest {
fn force(&self) -> bool {
self.inner.force
}

fn frontend_nodes(&self) -> Vec<String> {
self.inner.frontend_nodes.clone()
}
}

impl ValidateRequestTypes for UnpublishVolumeRequest {
Expand All @@ -1658,7 +1668,7 @@ impl ValidateRequestTypes for UnpublishVolumeRequest {

impl From<&dyn UnpublishVolumeInfo> for UnpublishVolume {
fn from(data: &dyn UnpublishVolumeInfo) -> Self {
UnpublishVolume::new(&data.uuid(), data.force())
UnpublishVolume::new(&data.uuid(), data.force(), data.frontend_nodes())
}
}

Expand All @@ -1667,6 +1677,7 @@ impl From<&dyn UnpublishVolumeInfo> for UnpublishVolumeRequest {
Self {
uuid: Some(data.uuid().to_string()),
force: data.force(),
frontend_nodes: data.frontend_nodes().iter().map(|n| n.into()).collect(),
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions control-plane/rest/openapi-specs/v0_api_spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1710,6 +1710,13 @@ paths:
schema:
type: boolean
default: false
- in: query
name: frontend_host
description: |-
Frontend host requesting the volume unpublish.
required: false
schema:
type: string
responses:
'200':
description: OK
Expand Down
8 changes: 6 additions & 2 deletions control-plane/rest/service/src/v0/volumes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ impl apis::actix_server::Volumes for RestApi {

async fn del_volume_target(
Path(volume_id): Path<Uuid>,
Query(force): Query<Option<bool>>,
Query((force, frontend_host)): Query<(Option<bool>, Option<String>)>,
) -> Result<models::Volume, RestError<RestJsonError>> {
let volume = client()
.unpublish(
&UnpublishVolume::new(&volume_id.into(), force.unwrap_or(false)),
&UnpublishVolume::new(
&volume_id.into(),
force.unwrap_or(false),
frontend_host.into_iter().collect(),
),
None,
)
.await?;
Expand Down
Loading

0 comments on commit 1bdbfcb

Please sign in to comment.