diff --git a/crate/cmd_rt/src/cmd_block/cmd_block_error.rs b/crate/cmd_rt/src/cmd_block/cmd_block_error.rs index cd81d403e..001b8acf2 100644 --- a/crate/cmd_rt/src/cmd_block/cmd_block_error.rs +++ b/crate/cmd_rt/src/cmd_block/cmd_block_error.rs @@ -12,6 +12,7 @@ where E: Debug, { /// Error originated from `CmdBlock` code. + #[error("`CmdBlock` block logic failed.")] Block { /// Outcome accumulator at the point of error. outcome_acc: OutcomeAcc, @@ -19,6 +20,7 @@ where error: E, }, /// Error originated from at least one item. + #[error("`CmdBlock` item logic failed.")] Item { /// Outcome accumulator at the point of error. outcome_acc: OutcomeAcc, diff --git a/crate/cmd_rt/src/cmd_block/cmd_block_rt.rs b/crate/cmd_rt/src/cmd_block/cmd_block_rt.rs index 26ebd2f1d..ef15d9156 100644 --- a/crate/cmd_rt/src/cmd_block/cmd_block_rt.rs +++ b/crate/cmd_rt/src/cmd_block/cmd_block_rt.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, pin::Pin}; +use std::fmt::Debug; use async_trait::async_trait; use peace_cmd::scopes::SingleProfileSingleFlowView; @@ -24,9 +24,16 @@ pub trait CmdBlockRt: Debug + Unpin { /// Executes this command block. async fn exec( - self: Pin>, + &self, view: &mut SingleProfileSingleFlowView<'_, Self::Error, Self::PKeys, SetUp>, #[cfg(feature = "output_progress")] progress_tx: Sender, input: Box, ) -> Result, Self::Error>, Self::Error>; + + /// Runs the error handler and maps the `CmdBlock`'s `Outcome` to + /// `CmdExecution::Outcome`. + /// + /// This allows a `Cmd` to run logic to map an intermediate `CmdBlock`s + /// outcome which contains item failures, to the `CmdExection` outcome type. + fn execution_outcome_from(&self, outcome_acc: Box) -> Box; } diff --git a/crate/cmd_rt/src/cmd_block/cmd_block_wrapper.rs b/crate/cmd_rt/src/cmd_block/cmd_block_wrapper.rs index 4efc70a52..fca341246 100644 --- a/crate/cmd_rt/src/cmd_block/cmd_block_wrapper.rs +++ b/crate/cmd_rt/src/cmd_block/cmd_block_wrapper.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, marker::PhantomData, pin::Pin}; +use std::{fmt::Debug, marker::PhantomData}; use async_trait::async_trait; use peace_cfg::ItemId; @@ -21,76 +21,133 @@ cfg_if::cfg_if! { /// /// [`CmdBlockRt`]: crate::CmdBlockRt #[derive(Debug)] -pub struct CmdBlockWrapper { +pub struct CmdBlockWrapper< + CB, + E, + PKeys, + ExecutionOutcome, + BlockOutcome, + BlockOutcomeAcc, + BlockOutcomePartial, + InputT, +> { /// Underlying `CmdBlock` implementation. /// /// The trait constraints are applied on impl blocks. cmd_block: CB, /// Seed function for `OutcomeAcc`. - fn_outcome_acc_init: fn() -> OutcomeAcc, + fn_outcome_acc_init: fn() -> BlockOutcomeAcc, + /// Function to run if an item failure happens while executing this + /// `CmdBlock`. + fn_error_handler: fn(Box) -> ExecutionOutcome, /// Marker. - marker: PhantomData<(E, PKeys, Outcome, OutcomePartial, InputT)>, + marker: PhantomData<(E, PKeys, BlockOutcome, BlockOutcomePartial, InputT)>, } -impl - CmdBlockWrapper +impl + CmdBlockWrapper< + CB, + E, + PKeys, + ExecutionOutcome, + BlockOutcome, + BlockOutcomeAcc, + BlockOutcomePartial, + InputT, + > where CB: CmdBlock< Error = E, PKeys = PKeys, - OutcomeAcc = OutcomeAcc, - OutcomePartial = OutcomePartial, + OutcomeAcc = BlockOutcomeAcc, + OutcomePartial = BlockOutcomePartial, InputT = InputT, >, { - pub fn new(cmd_block: CB, fn_outcome_acc_init: fn() -> OutcomeAcc) -> Self { + pub fn new( + cmd_block: CB, + fn_outcome_acc_init: fn() -> BlockOutcomeAcc, + fn_error_handler: fn(Box) -> ExecutionOutcome, + ) -> Self { Self { cmd_block, fn_outcome_acc_init, + fn_error_handler, marker: PhantomData, } } } -impl From<(CB, fn() -> OutcomeAcc)> - for CmdBlockWrapper +impl + From<( + CB, + fn() -> BlockOutcomeAcc, + fn(Box) -> ExecutionOutcome, + )> + for CmdBlockWrapper< + CB, + E, + PKeys, + ExecutionOutcome, + BlockOutcome, + BlockOutcomeAcc, + BlockOutcomePartial, + InputT, + > where CB: CmdBlock< Error = E, PKeys = PKeys, - OutcomeAcc = OutcomeAcc, - OutcomePartial = OutcomePartial, + OutcomeAcc = BlockOutcomeAcc, + OutcomePartial = BlockOutcomePartial, InputT = InputT, >, { - fn from((cmd_block, fn_outcome_acc_init): (CB, fn() -> OutcomeAcc)) -> Self { - Self::new(cmd_block, fn_outcome_acc_init) + fn from( + (cmd_block, fn_outcome_acc_init, fn_error_handler): ( + CB, + fn() -> BlockOutcomeAcc, + fn(Box) -> ExecutionOutcome, + ), + ) -> Self { + Self::new(cmd_block, fn_outcome_acc_init, fn_error_handler) } } #[async_trait(?Send)] -impl CmdBlockRt - for CmdBlockWrapper +impl + CmdBlockRt + for CmdBlockWrapper< + CB, + E, + PKeys, + ExecutionOutcome, + BlockOutcome, + BlockOutcomeAcc, + BlockOutcomePartial, + InputT, + > where CB: CmdBlock< Error = E, PKeys = PKeys, - Outcome = Outcome, - OutcomeAcc = OutcomeAcc, - OutcomePartial = OutcomePartial, + Outcome = BlockOutcome, + OutcomeAcc = BlockOutcomeAcc, + OutcomePartial = BlockOutcomePartial, > + Unpin, E: Debug + std::error::Error + From + Send + Unpin + 'static, PKeys: Debug + ParamsKeys + Unpin + 'static, - Outcome: Debug + Unpin + Send + Sync + 'static, - OutcomeAcc: Debug + Resource + Unpin + 'static, - OutcomePartial: Debug + Unpin + 'static, + ExecutionOutcome: Debug + Unpin + Send + Sync + 'static, + BlockOutcome: Debug + Unpin + Send + Sync + 'static, + BlockOutcomeAcc: Debug + Resource + Unpin + 'static, + BlockOutcomePartial: Debug + Unpin + 'static, InputT: Debug + Resource + Unpin + 'static, { type Error = E; type PKeys = PKeys; async fn exec( - self: Pin>, + &self, cmd_view: &mut SingleProfileSingleFlowView<'_, Self::Error, Self::PKeys, SetUp>, #[cfg(feature = "output_progress")] progress_tx: Sender, input: Box, @@ -99,14 +156,14 @@ where let input_type_name = tynm::type_name::(); let actual_type_name = Resource::type_name(&*input); panic!( - "Expected to downcast input to `{input_type_name}`.\n\ + "Expected to downcast `input` to `{input_type_name}`.\n\ The actual type name is `{actual_type_name:?}`\n\ This is a bug in the Peace framework." ); }); let cmd_block = &self.cmd_block; - let (outcomes_tx, mut outcomes_rx) = mpsc::unbounded_channel::(); + let (outcomes_tx, mut outcomes_rx) = mpsc::unbounded_channel::(); let mut cmd_outcome = { let outcome = (self.fn_outcome_acc_init)(); let errors = IndexMap::::new(); @@ -150,4 +207,19 @@ where }) }) } + + fn execution_outcome_from(&self, outcome_acc: Box) -> Box { + let outcome_acc = outcome_acc.downcast().unwrap_or_else(|outcome_acc| { + let outcome_acc_type_name = tynm::type_name::(); + let actual_type_name = Resource::type_name(&*outcome_acc); + panic!( + "Expected to downcast `outcome_acc` to `{outcome_acc_type_name}`.\n\ + The actual type name is `{actual_type_name:?}`\n\ + This is a bug in the Peace framework." + ); + }); + let execution_outcome = (self.fn_error_handler)(outcome_acc); + + Box::new(execution_outcome) as Box + } } diff --git a/crate/cmd_rt/src/cmd_execution.rs b/crate/cmd_rt/src/cmd_execution.rs index 2c4eb13be..fb82a25b4 100644 --- a/crate/cmd_rt/src/cmd_execution.rs +++ b/crate/cmd_rt/src/cmd_execution.rs @@ -95,16 +95,11 @@ where } let cmd_outcome_task = async { - let CmdViewAndBlockOutcome { - cmd_view: _cmd_view, - cmd_outcome, - #[cfg(feature = "output_progress")] - progress_tx, - } = stream::unfold(cmd_blocks, |cmd_blocks| { + let cmd_view_and_block_outcome_result = stream::unfold(cmd_blocks, |cmd_blocks| { let cmd_block_next = cmd_blocks.pop_front(); future::ready(cmd_block_next.map(|cmd_block_next| (cmd_block_next, cmd_blocks))) }) - .map(Result::<_, E>::Ok) + .map(Result::<_, ItemErrOrErr<'_, '_, E, PKeys>>::Ok) .try_fold( CmdViewAndBlockOutcome { cmd_view: &mut cmd_view, @@ -129,29 +124,53 @@ where cmd_outcome_previous.value, ); - let block_cmd_outcome = block_cmd_outcome_task.await?; + // `CmdBlock` block logic errors are propagated. + let block_cmd_outcome = + block_cmd_outcome_task.await.map_err(ItemErrOrErr::Error)?; if block_cmd_outcome.is_err() { - todo!( - "We cannot just return this, because the intermediate block \ - cmd outcome may not be the same type as the cmd execution outcome.\n\ - We may need to return the errors without the intermediate block outcome. - " - ); - // return block_cmd_outcome; + // `CmdBlock` outcomes with item errors need to be mapped to the + // `CmdExecution` outcome type, so we still return the item errors. + + let block_cmd_outcome = block_cmd_outcome + .map(|value| cmd_block_rt.execution_outcome_from(value)); + + let cmd_view_and_block_outcome = CmdViewAndBlockOutcome { + cmd_view, + cmd_outcome: block_cmd_outcome, + #[cfg(feature = "output_progress")] + progress_tx, + }; + + Err(ItemErrOrErr::Outcome(cmd_view_and_block_outcome)) + } else { + let cmd_view_and_block_outcome = CmdViewAndBlockOutcome { + cmd_view, + cmd_outcome: block_cmd_outcome, + #[cfg(feature = "output_progress")] + progress_tx, + }; + + Ok(cmd_view_and_block_outcome) } - - let cmd_view_and_block_outcome = CmdViewAndBlockOutcome { - cmd_view, - cmd_outcome: block_cmd_outcome, - #[cfg(feature = "output_progress")] - progress_tx, - }; - - Ok(cmd_view_and_block_outcome) }, ) - .await?; + .await; + + let cmd_view_and_block_outcome = match cmd_view_and_block_outcome_result { + Ok(cmd_view_and_block_outcome) => cmd_view_and_block_outcome, + Err(ItemErrOrErr::Outcome(cmd_view_and_block_outcome)) => { + cmd_view_and_block_outcome + } + Err(ItemErrOrErr::Error(error)) => return Err(error), + }; + + let CmdViewAndBlockOutcome { + cmd_view: _cmd_view, + cmd_outcome, + #[cfg(feature = "output_progress")] + progress_tx, + } = cmd_view_and_block_outcome; #[cfg(feature = "output_progress")] drop(progress_tx); @@ -195,3 +214,12 @@ where #[cfg(feature = "output_progress")] progress_tx: Sender, } + +enum ItemErrOrErr<'view_ref: 'view, 'view, E, PKeys> +where + E: 'static, + PKeys: ParamsKeys + 'static, +{ + Outcome(CmdViewAndBlockOutcome<'view_ref, 'view, E, PKeys>), + Error(E), +} diff --git a/crate/cmd_rt/src/cmd_execution/cmd_execution_builder.rs b/crate/cmd_rt/src/cmd_execution/cmd_execution_builder.rs index 73fb1532c..6e10a196c 100644 --- a/crate/cmd_rt/src/cmd_execution/cmd_execution_builder.rs +++ b/crate/cmd_rt/src/cmd_execution/cmd_execution_builder.rs @@ -37,23 +37,40 @@ where PKeys: Debug + ParamsKeys + Unpin + 'static, Outcome: Debug + Resource + Unpin + 'static, { - pub fn with_cmd_block( + pub fn with_cmd_block< + CB, + ExecutionOutcome, + BlockOutcomeNext, + BlockOutcomeAcc, + BlockOutcomePartial, + InputT, + >( self, - cmd_block: CmdBlockWrapper, - ) -> CmdExecutionBuilder + cmd_block: CmdBlockWrapper< + CB, + E, + PKeys, + ExecutionOutcome, + BlockOutcomeNext, + BlockOutcomeAcc, + BlockOutcomePartial, + InputT, + >, + ) -> CmdExecutionBuilder where CB: CmdBlock< Error = E, PKeys = PKeys, - Outcome = OutcomeNext, - OutcomeAcc = OutcomeAcc, - OutcomePartial = OutcomePartial, + Outcome = BlockOutcomeNext, + OutcomeAcc = BlockOutcomeAcc, + OutcomePartial = BlockOutcomePartial, InputT = InputT, > + Unpin + 'static, - OutcomeNext: Debug + Resource + Unpin + 'static, - OutcomeAcc: Debug + Resource + Unpin + 'static, - OutcomePartial: Debug + Unpin + 'static, + ExecutionOutcome: Debug + Resource + Unpin + 'static, + BlockOutcomeNext: Debug + Resource + Unpin + 'static, + BlockOutcomeAcc: Debug + Resource + Unpin + 'static, + BlockOutcomePartial: Debug + Unpin + 'static, InputT: Debug + Resource + Unpin + 'static, { let CmdExecutionBuilder { diff --git a/workspace_tests/src/cmd_rt/cmd_execution.rs b/workspace_tests/src/cmd_rt/cmd_execution.rs index fc9b3464d..29a1f08dd 100644 --- a/workspace_tests/src/cmd_rt/cmd_execution.rs +++ b/workspace_tests/src/cmd_rt/cmd_execution.rs @@ -1,12 +1,13 @@ use peace::{ cfg::{app_name, profile, AppName, FlowId, Profile}, - cmd::ctx::CmdCtx, + cmd::{ctx::CmdCtx, scopes::SingleProfileSingleFlow}, cmd_rt::{CmdBlockWrapper, CmdExecution}, resources::{ internal::StatesMut, + resources::ts::SetUp, states::{ ts::{Current, Goal}, - StateDiffs, + StateDiffs, StatesCurrent, StatesGoal, }, }, rt::cmds::{DiffCmd, StatesDiscoverCmd}, @@ -39,6 +40,13 @@ async fn runs_one_cmd_block() -> Result<(), PeaceTestError> { let states_goal_mut = StatesMut::::new(); (states_current_mut, states_goal_mut) }, + |states_current_and_goal_mut| { + let (states_current_mut, states_goal_mut) = + (states_current_and_goal_mut.0, states_current_and_goal_mut.1); + let states_current = StatesCurrent::from(states_current_mut); + let states_goal = StatesGoal::from(states_goal_mut); + (states_current, states_goal) + }, )) .build(); @@ -66,13 +74,15 @@ async fn runs_one_cmd_block() -> Result<(), PeaceTestError> { || { assert!( matches!( - cmd_outcome.as_ref(), + &cmd_outcome, CmdOutcome { - value: (states_current, states_goal), + value, errors, } - if states_current.len() == 2 - && states_goal.len() == 2 + if { + let (states_current, states_goal) = (&value.0, &value.1); + states_current.len() == 2 && states_goal.len() == 2 + } && errors.is_empty() ), "Expected states_current and states_goal to have 2 items,\n\ @@ -98,6 +108,13 @@ async fn chains_multiple_cmd_blocks() -> Result<(), PeaceTestError> { let states_goal_mut = StatesMut::::new(); (states_current_mut, states_goal_mut) }, + |states_current_and_goal_mut| { + let (states_current_mut, states_goal_mut) = + (states_current_and_goal_mut.0, states_current_and_goal_mut.1); + let states_current = StatesCurrent::from(states_current_mut); + let states_goal = StatesGoal::from(states_goal_mut); + (states_current, states_goal) + }, )) .with_cmd_block(CmdBlockWrapper::new( DiffCmd::< @@ -105,9 +122,16 @@ async fn chains_multiple_cmd_blocks() -> Result<(), PeaceTestError> { PeaceTestError, NoOpOutput, ParamsKeysImpl, - _, + SingleProfileSingleFlow< + '_, + PeaceTestError, + NoOpOutput, + ParamsKeysImpl, + SetUp, + >, >::default(), StateDiffs::new, + |state_diffs| -> StateDiffs { *state_diffs }, )) .build(); @@ -135,7 +159,7 @@ async fn chains_multiple_cmd_blocks() -> Result<(), PeaceTestError> { || { assert!( matches!( - cmd_outcome.as_ref(), + &cmd_outcome, CmdOutcome { value: state_diffs, errors,