diff --git a/README.md b/README.md index a6222644..bcda01d2 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,7 @@ The built-in blocks provided by Protoflow are listed below: | Block | Description | |:------------------|:-------------------------------------------------------------------------------------------------------------------------------| +| [`Add`] | Adds each number in the input stream to a running total. | | [`Buffer`] | Stores all messages it receives. | | [`ConcatStrings`] | Concatenates the received string messages, with an optional delimiter string inserted between each message. | | [`Const`] | Sends a constant value. | @@ -121,12 +122,14 @@ The built-in blocks provided by Protoflow are listed below: | [`DecodeHex`] | Decodes hexadecimal stream to byte stream. | | [`DecodeJSON`] | Decodes JSON messages from a byte stream. | | [`Delay`] | Passes messages through while delaying them by a fixed or random duration. | +| [`Div`] | Divides a running total by each number in the input stream sequentially. | | [`Drop`] | Discards all messages it receives. | | [`Encode`] | Encodes messages to a byte stream. | | [`EncodeCSV`] | Encodes the provided header and rows, given as `prost_types::Value`, into a CSV-formatted byte stream. | | [`EncodeHex`] | Encodes a byte stream into hexadecimal form. | | [`EncodeJSON`] | Encodes messages into JSON format. | | [`Hash`] | Computes the cryptographic hash of a byte stream. | +| [`Mul`] | Multiplies each number in the input stream with a running product. | | [`Random`] | Generates and sends a random value. | | [`ReadDir`] | Reads file names from a file system directory. | | [`ReadEnv`] | Reads the value of an environment variable. | @@ -134,11 +137,34 @@ The built-in blocks provided by Protoflow are listed below: | [`ReadSocket`] | Reads bytes from a TCP socket. | | [`ReadStdin`] | Reads bytes from standard input (aka stdin). | | [`SplitString`] | Splits the received input message, with an optional delimiter string parameter. | +| [`Sub`] | Subtracts each number in the input stream from a running total. | | [`WriteFile`] | Writes or appends bytes to the contents of a file. | | [`WriteSocket`] | Writes bytes to a TCP socket | | [`WriteStderr`] | Writes bytes to standard error (aka stderr). | | [`WriteStdout`] | Writes bytes to standard output (aka stdout). | +#### [`Add`] + +A block that adds each number in the input stream to a running total + +```mermaid +block-beta + columns 7 + Source space:2 Add space:2 Sink + Source-- "input" -->Add + Add-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Add block + class Source hidden + class Sink hidden +``` + +```bash +protoflow execute Add +``` + #### [`Buffer`] A block that simply stores all messages it receives. @@ -342,6 +368,28 @@ block-beta protoflow execute Delay fixed=2 ``` +#### [`Div`] + +A block that divides a running total by each number in the input stream sequentially + +```mermaid +block-beta + columns 7 + Source space:2 Div space:2 Sink + Source-- "input" -->Div + Div-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Div block + class Source hidden + class Sink hidden +``` + +```bash +protoflow execute Div +``` + #### [`Drop`] A block that simply discards all messages it receives. @@ -483,6 +531,28 @@ block-beta protoflow execute Hash algorithm=blake3 ``` +#### [`Mul`] + +A block that multiplies each number in the input stream with a running product + +```mermaid +block-beta + columns 7 + Source space:2 Mul space:2 Sink + Source-- "input" -->Mul + Mul-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Mul block + class Source hidden + class Sink hidden +``` + +```bash +protoflow execute Mul +``` + #### [`Random`] A block for generating and sending a random value. @@ -640,6 +710,28 @@ block-beta protoflow execute SplitString delimiter="," ``` +#### [`Sub`] + +A block subtracts each number in the input stream from a running total + +```mermaid +block-beta + columns 7 + Source space:2 Sub space:2 Sink + Source-- "input" -->Sub + Sub-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Sub block + class Source hidden + class Sink hidden +``` + +```bash +protoflow execute Sub +``` + #### [`WriteFile`] A block that writes or appends bytes to the contents of a file. @@ -794,6 +886,7 @@ To add a new block type implementation, make sure to examine and amend: [`echo_lines`]: lib/protoflow/examples/echo_lines [`examples`]: lib/protoflow/examples +[`Add`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Add.html [`Buffer`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Buffer.html [`ConcatStrings`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ConcatStrings.html [`Const`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Const.html @@ -803,12 +896,14 @@ To add a new block type implementation, make sure to examine and amend: [`DecodeHex`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.DecodeHex.html [`DecodeJSON`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.DecodeJson.html [`Delay`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Delay.html +[`Div`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Div.html [`Drop`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Drop.html [`Encode`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Encode.html [`EncodeCSV`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.EncodeCsv.html [`EncodeHex`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.EncodeHex.html [`EncodeJSON`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.EncodeJson.html [`Hash`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Hash.html +[`Mul`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Mul.html [`Random`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Random.html [`ReadDir`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadDir.html [`ReadEnv`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadEnv.html @@ -816,6 +911,7 @@ To add a new block type implementation, make sure to examine and amend: [`ReadSocket`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadSocket.html [`ReadStdin`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ReadStdin.html [`SplitString`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.SplitString.html +[`Sub`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Sub.html [`WriteFile`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.WriteFile.html [`WriteSocket`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.WriteSocket.html [`WriteStderr`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.WriteStderr.html diff --git a/lib/protoflow-blocks/Cargo.toml b/lib/protoflow-blocks/Cargo.toml index 52c54740..0c8f2fb9 100644 --- a/lib/protoflow-blocks/Cargo.toml +++ b/lib/protoflow-blocks/Cargo.toml @@ -67,6 +67,7 @@ struson = "0.5" sysml-model = { version = "=0.2.3", default-features = false, optional = true } ubyte = { version = "0.10", default-features = false } csv = "1.3.1" +num-traits = { version = "0.2.19", default-features = false } [dev-dependencies] bytes = "1.8.0" diff --git a/lib/protoflow-blocks/doc/math/add.mmd b/lib/protoflow-blocks/doc/math/add.mmd new file mode 100644 index 00000000..075ab3c1 --- /dev/null +++ b/lib/protoflow-blocks/doc/math/add.mmd @@ -0,0 +1,11 @@ +block-beta + columns 7 + Source space:2 Add space:2 Sink + Source-- "input" -->Add + Add-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Add block + class Source hidden + class Sink hidden diff --git a/lib/protoflow-blocks/doc/math/add.seq.mmd b/lib/protoflow-blocks/doc/math/add.seq.mmd new file mode 100644 index 00000000..bab98654 --- /dev/null +++ b/lib/protoflow-blocks/doc/math/add.seq.mmd @@ -0,0 +1,21 @@ +sequenceDiagram + autonumber + participant BlockA as Another block + participant Add.input as Add.input port + participant Add as Add block + participant Add.output as Add.output port + participant BlockB as Another block + + BlockA-->>Add: Connect + Add-->>BlockB: Connect + + loop Add process + BlockA->>Add: Message + Add->>Add: Add numbers + Add->>BlockB: Message + end + + BlockA-->>Add: Disconnect + Add-->>Add.input: Close + Add-->>Add.output: Close + Add-->>BlockB: Disconnect diff --git a/lib/protoflow-blocks/doc/math/div.mmd b/lib/protoflow-blocks/doc/math/div.mmd new file mode 100644 index 00000000..9d701fbf --- /dev/null +++ b/lib/protoflow-blocks/doc/math/div.mmd @@ -0,0 +1,11 @@ +block-beta + columns 7 + Source space:2 Div space:2 Sink + Source-- "input" -->Div + Div-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Div block + class Source hidden + class Sink hidden diff --git a/lib/protoflow-blocks/doc/math/div.seq.mmd b/lib/protoflow-blocks/doc/math/div.seq.mmd new file mode 100644 index 00000000..b5b8e570 --- /dev/null +++ b/lib/protoflow-blocks/doc/math/div.seq.mmd @@ -0,0 +1,21 @@ +sequenceDiagram + autonumber + participant BlockA as Another block + participant Div.input as Div.input port + participant Div as Div block + participant Div.output as Div.output port + participant BlockB as Another block + + BlockA-->>Div: Connect + Div-->>BlockB: Connect + + loop Div process + BlockA->>Div: Message + Div->>Div: Div numbers + Div->>BlockB: Message + end + + BlockA-->>Div: Disconnect + Div-->>Div.input: Close + Div-->>Div.output: Close + Div-->>BlockB: Disconnect diff --git a/lib/protoflow-blocks/doc/math/mul.mmd b/lib/protoflow-blocks/doc/math/mul.mmd new file mode 100644 index 00000000..d5fdbc9a --- /dev/null +++ b/lib/protoflow-blocks/doc/math/mul.mmd @@ -0,0 +1,11 @@ +block-beta + columns 7 + Source space:2 Mul space:2 Sink + Source-- "input" -->Mul + Mul-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Mul block + class Source hidden + class Sink hidden diff --git a/lib/protoflow-blocks/doc/math/mul.seq.mmd b/lib/protoflow-blocks/doc/math/mul.seq.mmd new file mode 100644 index 00000000..750396c9 --- /dev/null +++ b/lib/protoflow-blocks/doc/math/mul.seq.mmd @@ -0,0 +1,21 @@ +sequenceDiagram + autonumber + participant BlockA as Another block + participant Mul.input as Mul.input port + participant Mul as Mul block + participant Mul.output as Mul.output port + participant BlockB as Another block + + BlockA-->>Mul: Connect + Mul-->>BlockB: Connect + + loop Mul process + BlockA->>Mul: Message + Mul->>Mul: Mul numbers + Mul->>BlockB: Message + end + + BlockA-->>Mul: Disconnect + Mul-->>Mul.input: Close + Mul-->>Mul.output: Close + Mul-->>BlockB: Disconnect diff --git a/lib/protoflow-blocks/doc/math/sub.mmd b/lib/protoflow-blocks/doc/math/sub.mmd new file mode 100644 index 00000000..800f173d --- /dev/null +++ b/lib/protoflow-blocks/doc/math/sub.mmd @@ -0,0 +1,11 @@ +block-beta + columns 7 + Source space:2 Sub space:2 Sink + Source-- "input" -->Sub + Sub-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Sub block + class Source hidden + class Sink hidden diff --git a/lib/protoflow-blocks/doc/math/sub.seq.mmd b/lib/protoflow-blocks/doc/math/sub.seq.mmd new file mode 100644 index 00000000..31087c86 --- /dev/null +++ b/lib/protoflow-blocks/doc/math/sub.seq.mmd @@ -0,0 +1,21 @@ +sequenceDiagram + autonumber + participant BlockA as Another block + participant Sub.input as Sub.input port + participant Sub as Sub block + participant Sub.output as Sub.output port + participant BlockB as Another block + + BlockA-->>Sub: Connect + Sub-->>BlockB: Connect + + loop Sub process + BlockA->>Sub: Message + Sub->>Sub: Sub numbers + Sub->>BlockB: Message + end + + BlockA-->>Sub: Disconnect + Sub-->>Sub.input: Close + Sub-->>Sub.output: Close + Sub-->>BlockB: Disconnect diff --git a/lib/protoflow-blocks/src/block_config.rs b/lib/protoflow-blocks/src/block_config.rs index 8e32c0f0..ea025621 100644 --- a/lib/protoflow-blocks/src/block_config.rs +++ b/lib/protoflow-blocks/src/block_config.rs @@ -79,6 +79,10 @@ impl<'de> serde::Deserialize<'de> for BlockConfig { .unwrap() } + "Add" | "Div" | "Mul" | "Sub" => MathBlockConfig::deserialize(value.clone()) + .map(BlockConfig::Math) + .unwrap(), + #[cfg(feature = "std")] "ReadDir" | "ReadEnv" | "ReadFile" | "ReadSocket" | "ReadStdin" | "WriteFile" | "WriteSocket" | "WriteStderr" | "WriteStdout" => { diff --git a/lib/protoflow-blocks/src/block_tag.rs b/lib/protoflow-blocks/src/block_tag.rs index 8b97ebe7..ecea3505 100644 --- a/lib/protoflow-blocks/src/block_tag.rs +++ b/lib/protoflow-blocks/src/block_tag.rs @@ -34,6 +34,10 @@ pub enum BlockTag { EncodeHex, EncodeJson, // MathBlocks + Add, + Div, + Mul, + Sub, // SysBlocks #[cfg(feature = "std")] ReadDir, @@ -91,6 +95,10 @@ impl BlockTag { Encode => "Encode", EncodeHex => "EncodeHex", EncodeJson => "EncodeJSON", + Add => "Add", + Div => "Div", + Mul => "Mul", + Sub => "Sub", #[cfg(feature = "std")] ReadDir => "ReadDir", #[cfg(feature = "std")] @@ -142,6 +150,10 @@ impl FromStr for BlockTag { "Encode" => Encode, "EncodeHex" => EncodeHex, "EncodeJSON" => EncodeJson, + "Add" => Add, + "Div" => Div, + "Mul" => Mul, + "Sub" => Sub, #[cfg(feature = "std")] "ReadDir" => ReadDir, #[cfg(feature = "std")] @@ -204,6 +216,10 @@ impl BlockInstantiation for BlockTag { Encode => Box::new(super::Encode::::with_system(system, None)), EncodeHex => Box::new(super::EncodeHex::with_system(system)), EncodeJson => Box::new(super::EncodeJson::with_system(system)), + Add => Box::new(super::Add::::with_system(system)), + Div => Box::new(super::Div::::with_system(system)), + Mul => Box::new(super::Mul::::with_system(system)), + Sub => Box::new(super::Sub::::with_system(system)), #[cfg(feature = "std")] ReadDir => Box::new(super::ReadDir::with_system(system)), #[cfg(feature = "std")] diff --git a/lib/protoflow-blocks/src/blocks/math.rs b/lib/protoflow-blocks/src/blocks/math.rs index 6eb3dfea..00883c24 100644 --- a/lib/protoflow-blocks/src/blocks/math.rs +++ b/lib/protoflow-blocks/src/blocks/math.rs @@ -2,29 +2,97 @@ pub mod math { use super::{ - prelude::{Cow, Named}, - BlockConnections, BlockInstantiation, + prelude::{vec, Box, Cow, Named, Vec}, + BlockConnections, BlockInstantiation, InputPortName, OutputPortName, System, }; + use protoflow_core::Block; - pub trait MathBlocks {} + pub trait MathBlocks { + fn add(&mut self) -> Add; + fn div(&mut self) -> Div; + fn mul(&mut self) -> Mul; + fn sub(&mut self) -> Sub; + } #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] - pub enum MathBlockTag {} + pub enum MathBlockTag { + Add, + Div, + Mul, + Sub, + } #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Debug)] - pub enum MathBlockConfig {} + pub enum MathBlockConfig { + Add { + input: InputPortName, + output: OutputPortName, + }, + Div { + input: InputPortName, + output: OutputPortName, + }, + Mul { + input: InputPortName, + output: OutputPortName, + }, + Sub { + input: InputPortName, + output: OutputPortName, + }, + } impl Named for MathBlockConfig { fn name(&self) -> Cow { - unreachable!() + use MathBlockConfig::*; + Cow::Borrowed(match self { + Add { .. } => "Add", + Div { .. } => "Div", + Mul { .. } => "Mul", + Sub { .. } => "Sub", + }) + } + } + + impl BlockConnections for MathBlockConfig { + fn output_connections(&self) -> Vec<(&'static str, Option)> { + use MathBlockConfig::*; + match self { + Add { output, .. } + | Div { output, .. } + | Mul { output, .. } + | Sub { output, .. } => { + vec![("output", Some(output.clone()))] + } + } } } - impl BlockConnections for MathBlockConfig {} + impl BlockInstantiation for MathBlockConfig { + fn instantiate(&self, system: &mut System) -> Box { + use MathBlockConfig::*; + match self { + Add { .. } => Box::new(super::Add::::with_system(system)), + Div { .. } => Box::new(super::Div::::with_system(system)), + Mul { .. } => Box::new(super::Mul::::with_system(system)), + Sub { .. } => Box::new(super::Sub::::with_system(system)), + } + } + } + + mod add; + pub use add::*; + + mod div; + pub use div::*; + + mod mul; + pub use mul::*; - impl BlockInstantiation for MathBlockConfig {} + mod sub; + pub use sub::*; } pub use math::*; diff --git a/lib/protoflow-blocks/src/blocks/math/.gitkeep b/lib/protoflow-blocks/src/blocks/math/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/protoflow-blocks/src/blocks/math/add.rs b/lib/protoflow-blocks/src/blocks/math/add.rs new file mode 100644 index 00000000..53c859be --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/math/add.rs @@ -0,0 +1,150 @@ +// This is free and unencumbered software released into the public domain. + +extern crate std; + +use crate::{StdioConfig, StdioError, StdioSystem, System}; +use protoflow_core::{Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort}; +use protoflow_derive::Block; +use simple_mermaid::mermaid; + +pub trait Addable: Message + core::ops::Add + num_traits::Zero {} +impl + num_traits::Zero> Addable for T {} + +/// A block that adds numbers. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/math/add.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/math/add.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # fn main() { +/// System::build(|s| { +/// let config = StdioConfig { +/// encoding: Default::default(), +/// params: Default::default(), +/// }; +/// let input = s.read_stdin(); +/// let decode = s.decode_with::(config.encoding); +/// let add = s.add(); +/// let encode = s.encode_with::(config.encoding); +/// let output = config.write_stdout(s); +/// s.connect(&input.output, &decode.input); +/// s.connect(&decode.output, &add.input); +/// s.connect(&add.output, &encode.input); +/// s.connect(&encode.output, &output.input); +/// }); +/// # } +/// ``` +/// +/// ## Running the block via the CLI +/// +/// ```console +/// $ protoflow execute Add +/// ``` +/// +#[derive(Block, Clone)] +pub struct Add { + /// The input number stream. + #[input] + pub input: InputPort, + /// The output port to send the result on. + #[output] + pub output: OutputPort, +} + +impl Add { + pub fn new(input: InputPort, output: OutputPort) -> Self { + Self { input, output } + } + + pub fn with_system(system: &System) -> Self { + use crate::SystemBuilding; + Self::new(system.input(), system.output()) + } +} + +impl Block for Add { + fn execute(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult { + let mut sum = T::zero(); + while let Some(input) = self.input.recv()? { + sum = sum + input; + } + self.output.send(&sum)?; + + Ok(()) + } +} + +#[cfg(feature = "std")] +impl StdioSystem for Add { + fn build_system(config: StdioConfig) -> Result { + use crate::{IoBlocks, MathBlocks, SysBlocks, SystemBuilding}; + + config.reject_any()?; + + Ok(System::build(|s| { + let input = s.read_stdin(); + let decode = s.decode_with::(config.encoding); + let add = s.add(); + let encode = s.encode_with::(config.encoding); + let output = config.write_stdout(s); + s.connect(&input.output, &decode.input); + s.connect(&decode.output, &add.input); + s.connect(&add.output, &encode.input); + s.connect(&encode.output, &output.input); + })) + } +} + +#[cfg(test)] +mod tests { + + use protoflow_core::{runtimes::StdRuntime, transports::MpscTransport}; + + use super::Add; + use crate::{System, SystemBuilding, SystemExecution}; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.block(Add::::new(s.input(), s.output())); + let _ = s.block(Add::::new(s.input(), s.output())); + + let _ = s.block(Add::::new(s.input(), s.output())); + let _ = s.block(Add::::new(s.input(), s.output())); + + let _ = s.block(Add::::new(s.input(), s.output())); + let _ = s.block(Add::::new(s.input(), s.output())); + }); + } + + #[test] + fn run_block() { + let mut s = System::new(&StdRuntime::new(MpscTransport::new()).unwrap()); + + let mut values = s.output(); + let add = s.block(Add::::with_system(&s)); + let result = s.input(); + + assert!(s.connect(&values, &add.input)); + assert!(s.connect(&add.output, &result)); + + let exec = s.execute().unwrap(); + + values.send(&3).unwrap(); + values.send(&5).unwrap(); + values.close().unwrap(); + + assert_eq!(Ok(Some(8)), result.recv()); + + exec.join().unwrap(); + } +} diff --git a/lib/protoflow-blocks/src/blocks/math/div.rs b/lib/protoflow-blocks/src/blocks/math/div.rs new file mode 100644 index 00000000..83bf3306 --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/math/div.rs @@ -0,0 +1,166 @@ +// This is free and unencumbered software released into the public domain. + +extern crate std; + +use crate::{StdioConfig, StdioError, StdioSystem, System}; +use protoflow_core::{ + Block, BlockError, BlockResult, BlockRuntime, InputPort, Message, OutputPort, +}; +use protoflow_derive::Block; +use simple_mermaid::mermaid; + +pub trait Divisible: + Message + core::ops::Div + PartialEq + num_traits::One + num_traits::Zero +{ +} +impl + PartialEq + num_traits::One + num_traits::Zero> + Divisible for T +{ +} + +/// A block that divides numbers. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/math/div.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/math/div.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # fn main() { +/// System::build(|s| { +/// let config = StdioConfig { +/// encoding: Default::default(), +/// params: Default::default(), +/// }; +/// let input = s.read_stdin(); +/// let decode = s.decode_with::(config.encoding); +/// let div = s.div(); +/// let encode = s.encode_with::(config.encoding); +/// let output = config.write_stdout(s); +/// s.connect(&input.output, &decode.input); +/// s.connect(&decode.output, &div.input); +/// s.connect(&div.output, &encode.input); +/// s.connect(&encode.output, &output.input); +/// }); +/// # } +/// ``` +/// +/// ## Running the block via the CLI +/// +/// ```console +/// $ protoflow execute Div +/// ``` +/// +#[derive(Block, Clone)] +pub struct Div { + /// The input number stream. + #[input] + pub input: InputPort, + /// The output port to send the result on. + #[output] + pub output: OutputPort, +} + +impl Div { + pub fn new(input: InputPort, output: OutputPort) -> Self { + Self { input, output } + } + + pub fn with_system(system: &System) -> Self { + use crate::SystemBuilding; + Self::new(system.input(), system.output()) + } +} + +impl Block for Div { + fn execute(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult { + let result = if let Some(mut result) = self.input.recv()? { + while let Some(input) = self.input.recv()? { + if input == T::zero() { + return Err(BlockError::Other("Division by zero".into())); + } + result = result / input; + } + result + } else { + T::one() + }; + + self.output.send(&result)?; + + Ok(()) + } +} + +#[cfg(feature = "std")] +impl StdioSystem for Div { + fn build_system(config: StdioConfig) -> Result { + use crate::{IoBlocks, MathBlocks, SysBlocks, SystemBuilding}; + + config.reject_any()?; + + Ok(System::build(|s| { + let input = s.read_stdin(); + let decode = s.decode_with::(config.encoding); + let div = s.div(); + let encode = s.encode_with::(config.encoding); + let output = config.write_stdout(s); + s.connect(&input.output, &decode.input); + s.connect(&decode.output, &div.input); + s.connect(&div.output, &encode.input); + s.connect(&encode.output, &output.input); + })) + } +} + +#[cfg(test)] +mod tests { + + use protoflow_core::{runtimes::StdRuntime, transports::MpscTransport, SystemExecution}; + + use super::Div; + use crate::{System, SystemBuilding}; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.block(Div::::new(s.input(), s.output())); + let _ = s.block(Div::::new(s.input(), s.output())); + + let _ = s.block(Div::::new(s.input(), s.output())); + let _ = s.block(Div::::new(s.input(), s.output())); + + let _ = s.block(Div::::new(s.input(), s.output())); + let _ = s.block(Div::::new(s.input(), s.output())); + }); + } + + #[test] + fn run_block() { + let mut s = System::new(&StdRuntime::new(MpscTransport::new()).unwrap()); + + let mut values = s.output(); + let div = s.block(Div::::with_system(&s)); + let result = s.input(); + + assert!(s.connect(&values, &div.input)); + assert!(s.connect(&div.output, &result)); + + let exec = s.execute().unwrap(); + + values.send(&3.0).unwrap(); + values.send(&5.0).unwrap(); + values.close().unwrap(); + + assert_eq!(Ok(Some(3.0 / 5.0)), result.recv()); + + exec.join().unwrap(); + } +} diff --git a/lib/protoflow-blocks/src/blocks/math/mul.rs b/lib/protoflow-blocks/src/blocks/math/mul.rs new file mode 100644 index 00000000..4ffc7471 --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/math/mul.rs @@ -0,0 +1,151 @@ +// This is free and unencumbered software released into the public domain. + +extern crate std; + +use crate::{StdioConfig, StdioError, StdioSystem, System}; +use protoflow_core::{Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort}; +use protoflow_derive::Block; +use simple_mermaid::mermaid; + +pub trait Multipliable: Message + core::ops::Mul + num_traits::One {} +impl + num_traits::One> Multipliable for T {} + +/// A block that multiplies numbers. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/math/mul.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/math/mul.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```no_run +/// # use protoflow_blocks::*; +/// # fn main() { +/// System::build(|s| { +/// let config = StdioConfig { +/// encoding: Default::default(), +/// params: Default::default(), +/// }; +/// let input = s.read_stdin(); +/// let decode = s.decode_with::(config.encoding); +/// let mul = s.mul(); +/// let encode = s.encode_with::(config.encoding); +/// let output = config.write_stdout(s); +/// s.connect(&input.output, &decode.input); +/// s.connect(&decode.output, &mul.input); +/// s.connect(&mul.output, &encode.input); +/// s.connect(&encode.output, &output.input); +/// }); +/// # } +/// ``` +/// +/// ## Running the block via the CLI +/// +/// ```console +/// $ protoflow execute Mul +/// ``` +/// +#[derive(Block, Clone)] +pub struct Mul { + /// The input number stream. + #[input] + pub input: InputPort, + /// The output port to send the result on. + #[output] + pub output: OutputPort, +} + +impl Mul { + pub fn new(input: InputPort, output: OutputPort) -> Self { + Self { input, output } + } + + pub fn with_system(system: &System) -> Self { + use crate::SystemBuilding; + Self::new(system.input(), system.output()) + } +} + +impl Block for Mul { + fn execute(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult { + let mut product = T::one(); + while let Some(input) = self.input.recv()? { + product = product * input; + } + + self.output.send(&product)?; + + Ok(()) + } +} + +#[cfg(feature = "std")] +impl StdioSystem for Mul { + fn build_system(config: StdioConfig) -> Result { + use crate::{IoBlocks, MathBlocks, SysBlocks, SystemBuilding}; + + config.reject_any()?; + + Ok(System::build(|s| { + let input = s.read_stdin(); + let decode = s.decode_with::(config.encoding); + let mul = s.mul(); + let encode = s.encode_with::(config.encoding); + let output = config.write_stdout(s); + s.connect(&input.output, &decode.input); + s.connect(&decode.output, &mul.input); + s.connect(&mul.output, &encode.input); + s.connect(&encode.output, &output.input); + })) + } +} + +#[cfg(test)] +mod tests { + + use protoflow_core::{runtimes::StdRuntime, transports::MpscTransport}; + + use super::Mul; + use crate::{System, SystemBuilding, SystemExecution}; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.block(Mul::::new(s.input(), s.output())); + let _ = s.block(Mul::::new(s.input(), s.output())); + + let _ = s.block(Mul::::new(s.input(), s.output())); + let _ = s.block(Mul::::new(s.input(), s.output())); + + let _ = s.block(Mul::::new(s.input(), s.output())); + let _ = s.block(Mul::::new(s.input(), s.output())); + }); + } + + #[test] + fn run_block() { + let mut s = System::new(&StdRuntime::new(MpscTransport::new()).unwrap()); + + let mut values = s.output(); + let mul = s.block(Mul::::with_system(&s)); + let result = s.input(); + + assert!(s.connect(&values, &mul.input)); + assert!(s.connect(&mul.output, &result)); + + let exec = s.execute().unwrap(); + + values.send(&3).unwrap(); + values.send(&5).unwrap(); + values.close().unwrap(); + + assert_eq!(Ok(Some(15)), result.recv()); + + exec.join().unwrap(); + } +} diff --git a/lib/protoflow-blocks/src/blocks/math/sub.rs b/lib/protoflow-blocks/src/blocks/math/sub.rs new file mode 100644 index 00000000..afe29be0 --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/math/sub.rs @@ -0,0 +1,154 @@ +// This is free and unencumbered software released into the public domain. + +extern crate std; + +use crate::{StdioConfig, StdioError, StdioSystem, System}; +use protoflow_core::{Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort}; +use protoflow_derive::Block; +use simple_mermaid::mermaid; + +pub trait Subtractable: Message + core::ops::Sub + num_traits::Zero {} +impl + num_traits::Zero> Subtractable for T {} + +/// A block that subtracts numbers. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/math/sub.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/math/sub.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # fn main() { +/// System::build(|s| { +/// let config = StdioConfig { +/// encoding: Default::default(), +/// params: Default::default(), +/// }; +/// let input = s.read_stdin(); +/// let decode = s.decode_with::(config.encoding); +/// let sub = s.sub(); +/// let encode = s.encode_with::(config.encoding); +/// let output = config.write_stdout(s); +/// s.connect(&input.output, &decode.input); +/// s.connect(&decode.output, &sub.input); +/// s.connect(&sub.output, &encode.input); +/// s.connect(&encode.output, &output.input); +/// }); +/// # } +/// ``` +/// +/// ## Running the block via the CLI +/// +/// ```console +/// $ protoflow execute Sub +/// ``` +/// +#[derive(Block, Clone)] +pub struct Sub { + /// The input number stream. + #[input] + pub input: InputPort, + /// The output port to send the result on. + #[output] + pub output: OutputPort, +} + +impl Sub { + pub fn new(input: InputPort, output: OutputPort) -> Self { + Self { input, output } + } + + pub fn with_system(system: &System) -> Self { + use crate::SystemBuilding; + Self::new(system.input(), system.output()) + } +} + +impl Block for Sub { + fn execute(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult { + let result = if let Some(mut result) = self.input.recv()? { + while let Some(input) = self.input.recv()? { + result = result - input + } + result + } else { + T::zero() + }; + self.output.send(&result)?; + + Ok(()) + } +} + +#[cfg(feature = "std")] +impl StdioSystem for Sub { + fn build_system(config: StdioConfig) -> Result { + use crate::{IoBlocks, MathBlocks, SysBlocks, SystemBuilding}; + + config.reject_any()?; + + Ok(System::build(|s| { + let input = s.read_stdin(); + let decode = s.decode_with::(config.encoding); + let sub = s.sub(); + let encode = s.encode_with::(config.encoding); + let output = config.write_stdout(s); + s.connect(&input.output, &decode.input); + s.connect(&decode.output, &sub.input); + s.connect(&sub.output, &encode.input); + s.connect(&encode.output, &output.input); + })) + } +} + +#[cfg(test)] +mod tests { + + use protoflow_core::{runtimes::StdRuntime, transports::MpscTransport, SystemExecution}; + + use super::Sub; + use crate::{System, SystemBuilding}; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.block(Sub::::new(s.input(), s.output())); + let _ = s.block(Sub::::new(s.input(), s.output())); + + let _ = s.block(Sub::::new(s.input(), s.output())); + let _ = s.block(Sub::::new(s.input(), s.output())); + + let _ = s.block(Sub::::new(s.input(), s.output())); + let _ = s.block(Sub::::new(s.input(), s.output())); + }); + } + + #[test] + fn run_block() { + let mut s = System::new(&StdRuntime::new(MpscTransport::new()).unwrap()); + + let mut values = s.output(); + let sub = s.block(Sub::::with_system(&s)); + let result = s.input(); + + assert!(s.connect(&values, &sub.input)); + assert!(s.connect(&sub.output, &result)); + + let exec = s.execute().unwrap(); + + values.send(&3).unwrap(); + values.send(&5).unwrap(); + values.close().unwrap(); + + assert_eq!(Ok(Some(-2)), result.recv()); + + exec.join().unwrap(); + } +} diff --git a/lib/protoflow-blocks/src/blocks/text/.gitkeep b/lib/protoflow-blocks/src/blocks/text/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/lib/protoflow-blocks/src/lib.rs b/lib/protoflow-blocks/src/lib.rs index 30050a0b..28181cc0 100644 --- a/lib/protoflow-blocks/src/lib.rs +++ b/lib/protoflow-blocks/src/lib.rs @@ -76,6 +76,10 @@ pub fn build_stdio_system( "EncodeHex" => EncodeHex::build_system(config)?, "EncodeJSON" => EncodeJson::build_system(config)?, // MathBlocks + "Add" => Add::::build_system(config)?, + "Div" => Div::::build_system(config)?, + "Mul" => Mul::::build_system(config)?, + "Sub" => Sub::::build_system(config)?, // SysBlocks "ReadDir" => ReadDir::build_system(config)?, "ReadEnv" => ReadEnv::::build_system(config)?, diff --git a/lib/protoflow-blocks/src/system.rs b/lib/protoflow-blocks/src/system.rs index 695ccfba..b6e56a85 100644 --- a/lib/protoflow-blocks/src/system.rs +++ b/lib/protoflow-blocks/src/system.rs @@ -5,10 +5,10 @@ use crate::{ prelude::{fmt, Arc, Box, FromStr, Rc, String, ToString}, types::{DelayType, Encoding}, - AllBlocks, Buffer, ConcatStrings, Const, CoreBlocks, Count, Decode, DecodeCsv, DecodeHex, - DecodeJson, Delay, Drop, Encode, EncodeCsv, EncodeHex, EncodeJson, FlowBlocks, HashBlocks, - IoBlocks, MathBlocks, Random, ReadDir, ReadEnv, ReadFile, ReadSocket, ReadStdin, SplitString, - SysBlocks, TextBlocks, WriteFile, WriteSocket, WriteStderr, WriteStdout, + Add, AllBlocks, Buffer, ConcatStrings, Const, CoreBlocks, Count, Decode, DecodeCsv, DecodeHex, + DecodeJson, Delay, Div, Drop, Encode, EncodeCsv, EncodeHex, EncodeJson, FlowBlocks, HashBlocks, + IoBlocks, MathBlocks, Mul, Random, ReadDir, ReadEnv, ReadFile, ReadSocket, ReadStdin, + SplitString, Sub, SysBlocks, TextBlocks, WriteFile, WriteSocket, WriteStderr, WriteStdout, }; use protoflow_core::{ Block, BlockID, BlockResult, BoxedBlockType, InputPort, Message, OutputPort, PortID, @@ -239,7 +239,23 @@ impl IoBlocks for System { } } -impl MathBlocks for System {} +impl MathBlocks for System { + fn add(&mut self) -> Add { + self.0.block(Add::with_system(self)) + } + + fn div(&mut self) -> Div { + self.0.block(Div::with_system(self)) + } + + fn mul(&mut self) -> Mul { + self.0.block(Mul::with_system(self)) + } + + fn sub(&mut self) -> Sub { + self.0.block(Sub::with_system(self)) + } +} #[cfg(not(feature = "std"))] impl SysBlocks for System {}