Skip to content

Commit

Permalink
Take in error handler that maps BlockOutcomeAcc to ExecutionOutcome.
Browse files Browse the repository at this point in the history
  • Loading branch information
azriel91 committed Aug 25, 2023
1 parent 133664b commit 161fda9
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 70 deletions.
2 changes: 2 additions & 0 deletions crate/cmd_rt/src/cmd_block/cmd_block_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ where
E: Debug,
{
/// Error originated from `CmdBlock` code.
#[error("`CmdBlock` block logic failed.")]
Block {
/// Outcome accumulator at the point of error.
outcome_acc: OutcomeAcc,
/// Error that occurred.
error: E,
},
/// Error originated from at least one item.
#[error("`CmdBlock` item logic failed.")]
Item {
/// Outcome accumulator at the point of error.
outcome_acc: OutcomeAcc,
Expand Down
11 changes: 9 additions & 2 deletions crate/cmd_rt/src/cmd_block/cmd_block_rt.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt::Debug, pin::Pin};
use std::fmt::Debug;

use async_trait::async_trait;
use peace_cmd::scopes::SingleProfileSingleFlowView;
Expand All @@ -24,9 +24,16 @@ pub trait CmdBlockRt: Debug + Unpin {

/// Executes this command block.
async fn exec(
self: Pin<Box<Self>>,
&self,
view: &mut SingleProfileSingleFlowView<'_, Self::Error, Self::PKeys, SetUp>,
#[cfg(feature = "output_progress")] progress_tx: Sender<ProgressUpdateAndId>,
input: Box<dyn Resource>,
) -> Result<CmdOutcome<Box<dyn Resource>, Self::Error>, Self::Error>;

/// Runs the error handler and maps the `CmdBlock`'s `Outcome` to
/// `CmdExecution::Outcome`.
///
/// This allows a `Cmd` to run logic to map an intermediate `CmdBlock`s
/// outcome which contains item failures, to the `CmdExection` outcome type.
fn execution_outcome_from(&self, outcome_acc: Box<dyn Resource>) -> Box<dyn Resource>;
}
124 changes: 98 additions & 26 deletions crate/cmd_rt/src/cmd_block/cmd_block_wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt::Debug, marker::PhantomData, pin::Pin};
use std::{fmt::Debug, marker::PhantomData};

use async_trait::async_trait;
use peace_cfg::ItemId;
Expand All @@ -21,76 +21,133 @@ cfg_if::cfg_if! {
///
/// [`CmdBlockRt`]: crate::CmdBlockRt
#[derive(Debug)]
pub struct CmdBlockWrapper<CB, E, PKeys, Outcome, OutcomeAcc, OutcomePartial, InputT> {
pub struct CmdBlockWrapper<
CB,
E,
PKeys,
ExecutionOutcome,
BlockOutcome,
BlockOutcomeAcc,
BlockOutcomePartial,
InputT,
> {
/// Underlying `CmdBlock` implementation.
///
/// The trait constraints are applied on impl blocks.
cmd_block: CB,
/// Seed function for `OutcomeAcc`.
fn_outcome_acc_init: fn() -> OutcomeAcc,
fn_outcome_acc_init: fn() -> BlockOutcomeAcc,
/// Function to run if an item failure happens while executing this
/// `CmdBlock`.
fn_error_handler: fn(Box<BlockOutcomeAcc>) -> ExecutionOutcome,
/// Marker.
marker: PhantomData<(E, PKeys, Outcome, OutcomePartial, InputT)>,
marker: PhantomData<(E, PKeys, BlockOutcome, BlockOutcomePartial, InputT)>,
}

impl<CB, E, PKeys, Outcome, OutcomeAcc, OutcomePartial, InputT>
CmdBlockWrapper<CB, E, PKeys, Outcome, OutcomeAcc, OutcomePartial, InputT>
impl<CB, E, PKeys, ExecutionOutcome, BlockOutcome, BlockOutcomeAcc, BlockOutcomePartial, InputT>
CmdBlockWrapper<
CB,
E,
PKeys,
ExecutionOutcome,
BlockOutcome,
BlockOutcomeAcc,
BlockOutcomePartial,
InputT,
>
where
CB: CmdBlock<
Error = E,
PKeys = PKeys,
OutcomeAcc = OutcomeAcc,
OutcomePartial = OutcomePartial,
OutcomeAcc = BlockOutcomeAcc,
OutcomePartial = BlockOutcomePartial,
InputT = InputT,
>,
{
pub fn new(cmd_block: CB, fn_outcome_acc_init: fn() -> OutcomeAcc) -> Self {
pub fn new(
cmd_block: CB,
fn_outcome_acc_init: fn() -> BlockOutcomeAcc,
fn_error_handler: fn(Box<BlockOutcomeAcc>) -> ExecutionOutcome,
) -> Self {
Self {
cmd_block,
fn_outcome_acc_init,
fn_error_handler,
marker: PhantomData,
}
}
}

impl<CB, E, PKeys, Outcome, OutcomeAcc, OutcomePartial, InputT> From<(CB, fn() -> OutcomeAcc)>
for CmdBlockWrapper<CB, E, PKeys, Outcome, OutcomeAcc, OutcomePartial, InputT>
impl<CB, E, PKeys, ExecutionOutcome, BlockOutcome, BlockOutcomeAcc, BlockOutcomePartial, InputT>
From<(
CB,
fn() -> BlockOutcomeAcc,
fn(Box<BlockOutcomeAcc>) -> ExecutionOutcome,
)>
for CmdBlockWrapper<
CB,
E,
PKeys,
ExecutionOutcome,
BlockOutcome,
BlockOutcomeAcc,
BlockOutcomePartial,
InputT,
>
where
CB: CmdBlock<
Error = E,
PKeys = PKeys,
OutcomeAcc = OutcomeAcc,
OutcomePartial = OutcomePartial,
OutcomeAcc = BlockOutcomeAcc,
OutcomePartial = BlockOutcomePartial,
InputT = InputT,
>,
{
fn from((cmd_block, fn_outcome_acc_init): (CB, fn() -> OutcomeAcc)) -> Self {
Self::new(cmd_block, fn_outcome_acc_init)
fn from(
(cmd_block, fn_outcome_acc_init, fn_error_handler): (
CB,
fn() -> BlockOutcomeAcc,
fn(Box<BlockOutcomeAcc>) -> ExecutionOutcome,
),
) -> Self {
Self::new(cmd_block, fn_outcome_acc_init, fn_error_handler)
}
}

#[async_trait(?Send)]
impl<CB, E, PKeys, Outcome, OutcomeAcc, OutcomePartial, InputT> CmdBlockRt
for CmdBlockWrapper<CB, E, PKeys, Outcome, OutcomeAcc, OutcomePartial, InputT>
impl<CB, E, PKeys, ExecutionOutcome, BlockOutcome, BlockOutcomeAcc, BlockOutcomePartial, InputT>
CmdBlockRt
for CmdBlockWrapper<
CB,
E,
PKeys,
ExecutionOutcome,
BlockOutcome,
BlockOutcomeAcc,
BlockOutcomePartial,
InputT,
>
where
CB: CmdBlock<
Error = E,
PKeys = PKeys,
Outcome = Outcome,
OutcomeAcc = OutcomeAcc,
OutcomePartial = OutcomePartial,
Outcome = BlockOutcome,
OutcomeAcc = BlockOutcomeAcc,
OutcomePartial = BlockOutcomePartial,
> + Unpin,
E: Debug + std::error::Error + From<peace_rt_model::Error> + Send + Unpin + 'static,
PKeys: Debug + ParamsKeys + Unpin + 'static,
Outcome: Debug + Unpin + Send + Sync + 'static,
OutcomeAcc: Debug + Resource + Unpin + 'static,
OutcomePartial: Debug + Unpin + 'static,
ExecutionOutcome: Debug + Unpin + Send + Sync + 'static,
BlockOutcome: Debug + Unpin + Send + Sync + 'static,
BlockOutcomeAcc: Debug + Resource + Unpin + 'static,
BlockOutcomePartial: Debug + Unpin + 'static,
InputT: Debug + Resource + Unpin + 'static,
{
type Error = E;
type PKeys = PKeys;

async fn exec(
self: Pin<Box<Self>>,
&self,
cmd_view: &mut SingleProfileSingleFlowView<'_, Self::Error, Self::PKeys, SetUp>,
#[cfg(feature = "output_progress")] progress_tx: Sender<ProgressUpdateAndId>,
input: Box<dyn Resource>,
Expand All @@ -99,14 +156,14 @@ where
let input_type_name = tynm::type_name::<InputT>();
let actual_type_name = Resource::type_name(&*input);
panic!(
"Expected to downcast input to `{input_type_name}`.\n\
"Expected to downcast `input` to `{input_type_name}`.\n\
The actual type name is `{actual_type_name:?}`\n\
This is a bug in the Peace framework."
);
});
let cmd_block = &self.cmd_block;

let (outcomes_tx, mut outcomes_rx) = mpsc::unbounded_channel::<OutcomePartial>();
let (outcomes_tx, mut outcomes_rx) = mpsc::unbounded_channel::<BlockOutcomePartial>();
let mut cmd_outcome = {
let outcome = (self.fn_outcome_acc_init)();
let errors = IndexMap::<ItemId, E>::new();
Expand Down Expand Up @@ -150,4 +207,19 @@ where
})
})
}

fn execution_outcome_from(&self, outcome_acc: Box<dyn Resource>) -> Box<dyn Resource> {
let outcome_acc = outcome_acc.downcast().unwrap_or_else(|outcome_acc| {
let outcome_acc_type_name = tynm::type_name::<BlockOutcome>();
let actual_type_name = Resource::type_name(&*outcome_acc);
panic!(
"Expected to downcast `outcome_acc` to `{outcome_acc_type_name}`.\n\
The actual type name is `{actual_type_name:?}`\n\
This is a bug in the Peace framework."
);
});
let execution_outcome = (self.fn_error_handler)(outcome_acc);

Box::new(execution_outcome) as Box<dyn Resource>
}
}
78 changes: 53 additions & 25 deletions crate/cmd_rt/src/cmd_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,11 @@ where
}

let cmd_outcome_task = async {
let CmdViewAndBlockOutcome {
cmd_view: _cmd_view,
cmd_outcome,
#[cfg(feature = "output_progress")]
progress_tx,
} = stream::unfold(cmd_blocks, |cmd_blocks| {
let cmd_view_and_block_outcome_result = stream::unfold(cmd_blocks, |cmd_blocks| {
let cmd_block_next = cmd_blocks.pop_front();
future::ready(cmd_block_next.map(|cmd_block_next| (cmd_block_next, cmd_blocks)))
})
.map(Result::<_, E>::Ok)
.map(Result::<_, ItemErrOrErr<'_, '_, E, PKeys>>::Ok)
.try_fold(
CmdViewAndBlockOutcome {
cmd_view: &mut cmd_view,
Expand All @@ -129,29 +124,53 @@ where
cmd_outcome_previous.value,
);

let block_cmd_outcome = block_cmd_outcome_task.await?;
// `CmdBlock` block logic errors are propagated.
let block_cmd_outcome =
block_cmd_outcome_task.await.map_err(ItemErrOrErr::Error)?;

if block_cmd_outcome.is_err() {
todo!(
"We cannot just return this, because the intermediate block \
cmd outcome may not be the same type as the cmd execution outcome.\n\
We may need to return the errors without the intermediate block outcome.
"
);
// return block_cmd_outcome;
// `CmdBlock` outcomes with item errors need to be mapped to the
// `CmdExecution` outcome type, so we still return the item errors.

let block_cmd_outcome = block_cmd_outcome
.map(|value| cmd_block_rt.execution_outcome_from(value));

let cmd_view_and_block_outcome = CmdViewAndBlockOutcome {
cmd_view,
cmd_outcome: block_cmd_outcome,
#[cfg(feature = "output_progress")]
progress_tx,
};

Err(ItemErrOrErr::Outcome(cmd_view_and_block_outcome))
} else {
let cmd_view_and_block_outcome = CmdViewAndBlockOutcome {
cmd_view,
cmd_outcome: block_cmd_outcome,
#[cfg(feature = "output_progress")]
progress_tx,
};

Ok(cmd_view_and_block_outcome)
}

let cmd_view_and_block_outcome = CmdViewAndBlockOutcome {
cmd_view,
cmd_outcome: block_cmd_outcome,
#[cfg(feature = "output_progress")]
progress_tx,
};

Ok(cmd_view_and_block_outcome)
},
)
.await?;
.await;

let cmd_view_and_block_outcome = match cmd_view_and_block_outcome_result {
Ok(cmd_view_and_block_outcome) => cmd_view_and_block_outcome,
Err(ItemErrOrErr::Outcome(cmd_view_and_block_outcome)) => {
cmd_view_and_block_outcome
}
Err(ItemErrOrErr::Error(error)) => return Err(error),
};

let CmdViewAndBlockOutcome {
cmd_view: _cmd_view,
cmd_outcome,
#[cfg(feature = "output_progress")]
progress_tx,
} = cmd_view_and_block_outcome;

#[cfg(feature = "output_progress")]
drop(progress_tx);
Expand Down Expand Up @@ -195,3 +214,12 @@ where
#[cfg(feature = "output_progress")]
progress_tx: Sender<ProgressUpdateAndId>,
}

enum ItemErrOrErr<'view_ref: 'view, 'view, E, PKeys>
where
E: 'static,
PKeys: ParamsKeys + 'static,
{
Outcome(CmdViewAndBlockOutcome<'view_ref, 'view, E, PKeys>),
Error(E),
}
35 changes: 26 additions & 9 deletions crate/cmd_rt/src/cmd_execution/cmd_execution_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,40 @@ where
PKeys: Debug + ParamsKeys + Unpin + 'static,
Outcome: Debug + Resource + Unpin + 'static,
{
pub fn with_cmd_block<CB, OutcomeNext, OutcomeAcc, OutcomePartial, InputT>(
pub fn with_cmd_block<
CB,
ExecutionOutcome,
BlockOutcomeNext,
BlockOutcomeAcc,
BlockOutcomePartial,
InputT,
>(
self,
cmd_block: CmdBlockWrapper<CB, E, PKeys, OutcomeNext, OutcomeAcc, OutcomePartial, InputT>,
) -> CmdExecutionBuilder<E, PKeys, OutcomeNext>
cmd_block: CmdBlockWrapper<
CB,
E,
PKeys,
ExecutionOutcome,
BlockOutcomeNext,
BlockOutcomeAcc,
BlockOutcomePartial,
InputT,
>,
) -> CmdExecutionBuilder<E, PKeys, ExecutionOutcome>
where
CB: CmdBlock<
Error = E,
PKeys = PKeys,
Outcome = OutcomeNext,
OutcomeAcc = OutcomeAcc,
OutcomePartial = OutcomePartial,
Outcome = BlockOutcomeNext,
OutcomeAcc = BlockOutcomeAcc,
OutcomePartial = BlockOutcomePartial,
InputT = InputT,
> + Unpin
+ 'static,
OutcomeNext: Debug + Resource + Unpin + 'static,
OutcomeAcc: Debug + Resource + Unpin + 'static,
OutcomePartial: Debug + Unpin + 'static,
ExecutionOutcome: Debug + Resource + Unpin + 'static,
BlockOutcomeNext: Debug + Resource + Unpin + 'static,
BlockOutcomeAcc: Debug + Resource + Unpin + 'static,
BlockOutcomePartial: Debug + Unpin + 'static,
InputT: Debug + Resource + Unpin + 'static,
{
let CmdExecutionBuilder {
Expand Down
Loading

0 comments on commit 161fda9

Please sign in to comment.