From 8e16efa82605b480636ab303de1b313a0226a1b7 Mon Sep 17 00:00:00 2001 From: Yura Menshov Date: Wed, 18 Dec 2024 14:00:40 +0200 Subject: [PATCH 1/3] add Gate block --- README.md | 28 ++++ lib/protoflow-blocks/doc/flow/gate.mmd | 16 +++ lib/protoflow-blocks/doc/flow/gate.seq.mmd | 32 +++++ lib/protoflow-blocks/src/blocks/flow.rs | 57 ++++++-- lib/protoflow-blocks/src/blocks/flow/gate.rs | 132 +++++++++++++++++++ lib/protoflow-blocks/src/lib.rs | 1 + lib/protoflow-blocks/src/system.rs | 12 +- 7 files changed, 266 insertions(+), 12 deletions(-) create mode 100644 lib/protoflow-blocks/doc/flow/gate.mmd create mode 100644 lib/protoflow-blocks/doc/flow/gate.seq.mmd create mode 100644 lib/protoflow-blocks/src/blocks/flow/gate.rs diff --git a/README.md b/README.md index a6222644..ec2730d5 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,7 @@ The built-in blocks provided by Protoflow are listed below: | [`EncodeCSV`] | Encodes the provided header and rows, given as `prost_types::Value`, into a CSV-formatted byte stream. | | [`EncodeHex`] | Encodes a byte stream into hexadecimal form. | | [`EncodeJSON`] | Encodes messages into JSON format. | +| [`Gate`] | Keeps all messages it receives, and sends them downstream when triggered. | | [`Hash`] | Computes the cryptographic hash of a byte stream. | | [`Random`] | Generates and sends a random value. | | [`ReadDir`] | Reads file names from a file system directory. | @@ -455,6 +456,33 @@ block-beta protoflow execute EncodeJSON ``` +#### [`Gate`] + +A block that keeps all messages it receives, and sends them downstream when triggered. + +```mermaid +block-beta + columns 7 + Source space:2 Count space:2 Sink + space:7 + space:7 + space:3 Pulse space:3 + Source-- "input" -->Gate + Pulse-- "trigger" -->Gate + Gate-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Gate block + class Source hidden + class Sink hidden + class Pulse hidden +``` + +```bash +protoflow execute Gate +``` + #### [`Hash`] A block that computes the cryptographic hash of a byte stream, while optionally diff --git a/lib/protoflow-blocks/doc/flow/gate.mmd b/lib/protoflow-blocks/doc/flow/gate.mmd new file mode 100644 index 00000000..8c54c559 --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/gate.mmd @@ -0,0 +1,16 @@ +block-beta + columns 7 + Source space:2 Count space:2 Sink + space:7 + space:7 + space:3 Pulse space:3 + Source-- "input" -->Gate + Pulse-- "trigger" -->Gate + Gate-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Gate block + class Source hidden + class Sink hidden + class Pulse hidden diff --git a/lib/protoflow-blocks/doc/flow/gate.seq.mmd b/lib/protoflow-blocks/doc/flow/gate.seq.mmd new file mode 100644 index 00000000..f99ae997 --- /dev/null +++ b/lib/protoflow-blocks/doc/flow/gate.seq.mmd @@ -0,0 +1,32 @@ +sequenceDiagram + autonumber + participant BlockA as Another block (input source) + participant Gate.input as Gate.input port + participant Gate as Gate block + participant Gate.output as Gate.output port + participant BlockB as Another block (downstream sink) + participant Gate.trigger as Gate.trigger port + participant BlockC as Another block (trigger source) + + BlockA-->>Gate: Connect (input) + BlockC-->>Gate: Connect (trigger) + Gate-->>BlockB: Connect (output) + + loop Storing messages + BlockA->>Gate: Message + Gate->>Gate: Store message internally + end + + BlockC->>Gate: Trigger + loop Releasing messages + Gate->>BlockB: Stored Message + end + + BlockA-->>Gate: Disconnect (input) + Gate-->>Gate.input: Close + + BlockC-->>Gate: Disconnect (trigger) + Gate-->>Gate.trigger: Close + + Gate-->>BlockB: Disconnect (output) + Gate-->>Gate.output: Close diff --git a/lib/protoflow-blocks/src/blocks/flow.rs b/lib/protoflow-blocks/src/blocks/flow.rs index 21496e25..911c7ed6 100644 --- a/lib/protoflow-blocks/src/blocks/flow.rs +++ b/lib/protoflow-blocks/src/blocks/flow.rs @@ -1,30 +1,71 @@ // This is free and unencumbered software released into the public domain. pub mod flow { + use crate::{InputPortName, OutputPortName}; + use super::{ - prelude::{Cow, Named}, - BlockConnections, BlockInstantiation, + prelude::{vec, Box, Cow, Named, Vec}, + BlockConnections, BlockInstantiation, System, }; - pub trait FlowBlocks {} + use protoflow_core::{Block, Message}; + + pub trait FlowBlocks { + fn gate(&mut self) -> Gate; + } #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] - pub enum FlowBlockTag {} + pub enum FlowBlockTag { + Gate, + } #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Debug)] - pub enum FlowBlockConfig {} + pub enum FlowBlockConfig { + Gate { + input: InputPortName, + trigger: InputPortName, + output: OutputPortName, + }, + } impl Named for FlowBlockConfig { fn name(&self) -> Cow { - unreachable!() + use FlowBlockConfig::*; + Cow::Borrowed(match self { + Gate { .. } => "Gate", + }) } } - impl BlockConnections for FlowBlockConfig {} + impl BlockConnections for FlowBlockConfig { + fn output_connections(&self) -> Vec<(&'static str, Option)> { + use FlowBlockConfig::*; + match self { + Gate { output, .. } => { + vec![("output", Some(output.clone()))] + } + } + } + } + + impl BlockInstantiation for FlowBlockConfig { + fn instantiate(&self, system: &mut System) -> Box { + use super::SystemBuilding; + use FlowBlockConfig::*; + match self { + Gate { .. } => Box::new(super::Gate::<_, ()>::new( + system.input_any(), + system.input(), + system.output_any(), + )), + } + } + } - impl BlockInstantiation for FlowBlockConfig {} + mod gate; + pub use gate::*; } pub use flow::*; diff --git a/lib/protoflow-blocks/src/blocks/flow/gate.rs b/lib/protoflow-blocks/src/blocks/flow/gate.rs new file mode 100644 index 00000000..c228797c --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/flow/gate.rs @@ -0,0 +1,132 @@ +// This is free and unencumbered software released into the public domain. + +use crate::{prelude::Vec, StdioConfig, StdioError, StdioSystem, System}; +use protoflow_core::{ + types::Any, Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort, +}; +use protoflow_derive::Block; +use simple_mermaid::mermaid; + +/// A block that keeps all messages it receives, +/// and sends them downstream when triggered. +/// +/// When triggered, the block will send all messages it received since last trigger, +/// and _WILL_ clean the internal buffer. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/flow/gate.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/flow/gate.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # fn main() { +/// System::build(|s| { +/// let stdin = s.read_stdin(); +/// let gate = s.gate(); +/// s.connect(&stdin.output, &gate.input); +/// }); +/// # } +/// ``` +/// +/// ## Running the block via the CLI +/// +/// ```console +/// $ protoflow execute Gate +/// ``` +/// +#[derive(Block, Clone)] +pub struct Gate { + /// The input message stream. + #[input] + pub input: InputPort, + + /// The trigger port. + #[input] + pub trigger: InputPort, + + /// The output message stream. + #[output] + pub output: OutputPort, + + /// The internal state storing the messages received. + #[state] + messages: Vec, +} + +impl Gate { + pub fn new( + input: InputPort, + trigger: InputPort, + output: OutputPort, + ) -> Self { + Self { + input, + trigger, + output, + messages: Vec::new(), + } + } + + pub fn messages(&self) -> &Vec { + &self.messages + } +} + +impl Gate { + pub fn with_system(system: &System) -> Self { + use crate::SystemBuilding; + Self::new(system.input(), system.input(), system.output()) + } +} + +impl Block for Gate { + fn execute(&mut self, _: &dyn BlockRuntime) -> BlockResult { + while let Some(message) = self.input.recv()? { + self.messages.push(message); + } + + while let Some(_) = self.trigger.recv()? { + let iter = self.messages.drain(..); + for message in iter { + self.output.send(&message)?; + } + } + + Ok(()) + } +} + +#[cfg(feature = "std")] +impl StdioSystem for Gate { + fn build_system(config: StdioConfig) -> Result { + use crate::{FlowBlocks, SystemBuilding}; + + config.reject_any()?; + + Ok(System::build(|s| { + let stdin = config.read_stdin(s); + let gate = s.gate(); + s.connect(&stdin.output, &gate.input); + })) + } +} + +#[cfg(test)] +mod tests { + use super::Gate; + use crate::{System, SystemBuilding}; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.block(Gate::::new(s.input(), s.input(), s.output())); + }); + } +} diff --git a/lib/protoflow-blocks/src/lib.rs b/lib/protoflow-blocks/src/lib.rs index 41d1decd..fa19e439 100644 --- a/lib/protoflow-blocks/src/lib.rs +++ b/lib/protoflow-blocks/src/lib.rs @@ -60,6 +60,7 @@ pub fn build_stdio_system( "Drop" => Drop::::build_system(config)?, "Random" => Random::::build_system(config)?, // FlowBlocks + "Gate" => Gate::::build_system(config)?, // HashBlocks #[cfg(any( feature = "hash-blake3", diff --git a/lib/protoflow-blocks/src/system.rs b/lib/protoflow-blocks/src/system.rs index 1ac658c0..b7b4e0c0 100644 --- a/lib/protoflow-blocks/src/system.rs +++ b/lib/protoflow-blocks/src/system.rs @@ -6,9 +6,9 @@ use crate::{ prelude::{fmt, Arc, Box, Bytes, FromStr, Rc, String, ToString}, types::{DelayType, Encoding}, AllBlocks, Buffer, ConcatStrings, Const, CoreBlocks, Count, Decode, DecodeCsv, DecodeHex, - DecodeJson, Delay, Drop, Encode, EncodeCsv, EncodeHex, EncodeJson, FlowBlocks, HashBlocks, - IoBlocks, MathBlocks, Random, ReadDir, ReadEnv, ReadFile, ReadStdin, SplitString, SysBlocks, - TextBlocks, WriteFile, WriteStderr, WriteStdout, + DecodeJson, Delay, Drop, Encode, EncodeCsv, EncodeHex, EncodeJson, FlowBlocks, Gate, + HashBlocks, IoBlocks, MathBlocks, Random, ReadDir, ReadEnv, ReadFile, ReadStdin, SplitString, + SysBlocks, TextBlocks, WriteFile, WriteStderr, WriteStdout, }; #[cfg(all(feature = "std", feature = "serde"))] use crate::{ReadSocket, WriteSocket}; @@ -170,7 +170,11 @@ impl CoreBlocks for System { } } -impl FlowBlocks for System {} +impl FlowBlocks for System { + fn gate(&mut self) -> Gate { + self.0.block(Gate::::with_system(self)) + } +} #[cfg(not(any( feature = "hash-blake3", From 6d7f972349999658943ab10e3aa1290052a948b3 Mon Sep 17 00:00:00 2001 From: Yura Menshov Date: Thu, 19 Dec 2024 13:33:16 +0200 Subject: [PATCH 2/3] fix CoreBlocks trait --- lib/protoflow-blocks/src/blocks/flow.rs | 4 +++- lib/protoflow-blocks/src/blocks/flow/gate.rs | 2 +- lib/protoflow-blocks/src/system.rs | 6 ++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/lib/protoflow-blocks/src/blocks/flow.rs b/lib/protoflow-blocks/src/blocks/flow.rs index 911c7ed6..83596da8 100644 --- a/lib/protoflow-blocks/src/blocks/flow.rs +++ b/lib/protoflow-blocks/src/blocks/flow.rs @@ -11,7 +11,9 @@ pub mod flow { use protoflow_core::{Block, Message}; pub trait FlowBlocks { - fn gate(&mut self) -> Gate; + fn gate( + &mut self, + ) -> Gate; } #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] diff --git a/lib/protoflow-blocks/src/blocks/flow/gate.rs b/lib/protoflow-blocks/src/blocks/flow/gate.rs index c228797c..450c9034 100644 --- a/lib/protoflow-blocks/src/blocks/flow/gate.rs +++ b/lib/protoflow-blocks/src/blocks/flow/gate.rs @@ -111,7 +111,7 @@ impl StdioSystem for Gate { Ok(System::build(|s| { let stdin = config.read_stdin(s); - let gate = s.gate(); + let gate = s.gate::<_, ()>(); s.connect(&stdin.output, &gate.input); })) } diff --git a/lib/protoflow-blocks/src/system.rs b/lib/protoflow-blocks/src/system.rs index b7b4e0c0..5b4743fd 100644 --- a/lib/protoflow-blocks/src/system.rs +++ b/lib/protoflow-blocks/src/system.rs @@ -171,8 +171,10 @@ impl CoreBlocks for System { } impl FlowBlocks for System { - fn gate(&mut self) -> Gate { - self.0.block(Gate::::with_system(self)) + fn gate( + &mut self, + ) -> Gate { + self.0.block(Gate::::with_system(self)) } } From 9845faa19f742f6a3322281c7cfbd246c7c6c0ee Mon Sep 17 00:00:00 2001 From: Yura Menshov Date: Thu, 19 Dec 2024 13:33:54 +0200 Subject: [PATCH 3/3] change example --- lib/protoflow-blocks/src/blocks/flow/gate.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/protoflow-blocks/src/blocks/flow/gate.rs b/lib/protoflow-blocks/src/blocks/flow/gate.rs index 450c9034..52abc672 100644 --- a/lib/protoflow-blocks/src/blocks/flow/gate.rs +++ b/lib/protoflow-blocks/src/blocks/flow/gate.rs @@ -28,8 +28,14 @@ use simple_mermaid::mermaid; /// # fn main() { /// System::build(|s| { /// let stdin = s.read_stdin(); +/// let hello = s.const_string("Hello, World!"); +/// let encode = s.encode_lines(); /// let gate = s.gate(); -/// s.connect(&stdin.output, &gate.input); +/// let stdout = s.write_stdout(); +/// s.connect(&hello.output, &encode.input); +/// s.connect(&encode.output, &gate.input); +/// s.connect(&stdin.output, &gate.trigger); +/// s.connect(&gate.output, &stdout.input); /// }); /// # } /// ```