Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ The built-in blocks provided by Protoflow are listed below:
| [`EncodeCSV`] | Encodes the provided header and rows, given as `prost_types::Value`, into a CSV-formatted byte stream. |
| [`EncodeHex`] | Encodes a byte stream into hexadecimal form. |
| [`EncodeJSON`] | Encodes messages into JSON format. |
| [`Gate`] | Keeps all messages it receives, and sends them downstream when triggered. |
| [`Hash`] | Computes the cryptographic hash of a byte stream. |
| [`Random`] | Generates and sends a random value. |
| [`ReadDir`] | Reads file names from a file system directory. |
Expand Down Expand Up @@ -455,6 +456,33 @@ block-beta
protoflow execute EncodeJSON
```

#### [`Gate`]

A block that keeps all messages it receives, and sends them downstream when triggered.

```mermaid
block-beta
columns 7
Source space:2 Count space:2 Sink
space:7
space:7
space:3 Pulse space:3
Source-- "input" -->Gate
Pulse-- "trigger" -->Gate
Gate-- "output" -->Sink

classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class Gate block
class Source hidden
class Sink hidden
class Pulse hidden
```

```bash
protoflow execute Gate
```

#### [`Hash`]

A block that computes the cryptographic hash of a byte stream, while optionally
Expand Down
16 changes: 16 additions & 0 deletions lib/protoflow-blocks/doc/flow/gate.mmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
block-beta
columns 7
Source space:2 Count space:2 Sink
space:7
space:7
space:3 Pulse space:3
Source-- "input" -->Gate
Pulse-- "trigger" -->Gate
Gate-- "output" -->Sink

classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class Gate block
class Source hidden
class Sink hidden
class Pulse hidden
32 changes: 32 additions & 0 deletions lib/protoflow-blocks/doc/flow/gate.seq.mmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
sequenceDiagram
autonumber
participant BlockA as Another block (input source)
participant Gate.input as Gate.input port
participant Gate as Gate block
participant Gate.output as Gate.output port
participant BlockB as Another block (downstream sink)
participant Gate.trigger as Gate.trigger port
participant BlockC as Another block (trigger source)

BlockA-->>Gate: Connect (input)
BlockC-->>Gate: Connect (trigger)
Gate-->>BlockB: Connect (output)

loop Storing messages
BlockA->>Gate: Message
Gate->>Gate: Store message internally
end

BlockC->>Gate: Trigger
loop Releasing messages
Gate->>BlockB: Stored Message
end

BlockA-->>Gate: Disconnect (input)
Gate-->>Gate.input: Close

BlockC-->>Gate: Disconnect (trigger)
Gate-->>Gate.trigger: Close

Gate-->>BlockB: Disconnect (output)
Gate-->>Gate.output: Close
59 changes: 51 additions & 8 deletions lib/protoflow-blocks/src/blocks/flow.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,73 @@
// This is free and unencumbered software released into the public domain.

pub mod flow {
use crate::{InputPortName, OutputPortName};

use super::{
prelude::{Cow, Named},
BlockConnections, BlockInstantiation,
prelude::{vec, Box, Cow, Named, Vec},
BlockConnections, BlockInstantiation, System,
};

pub trait FlowBlocks {}
use protoflow_core::{Block, Message};

pub trait FlowBlocks {
fn gate<Input: Message + 'static, Trigger: Message + 'static>(
&mut self,
) -> Gate<Input, Trigger>;
}

#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub enum FlowBlockTag {}
pub enum FlowBlockTag {
Gate,
}

#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[derive(Clone, Debug)]
pub enum FlowBlockConfig {}
pub enum FlowBlockConfig {
Gate {
input: InputPortName,
trigger: InputPortName,
output: OutputPortName,
},
}

impl Named for FlowBlockConfig {
fn name(&self) -> Cow<str> {
unreachable!()
use FlowBlockConfig::*;
Cow::Borrowed(match self {
Gate { .. } => "Gate",
})
}
}

impl BlockConnections for FlowBlockConfig {}
impl BlockConnections for FlowBlockConfig {
fn output_connections(&self) -> Vec<(&'static str, Option<OutputPortName>)> {
use FlowBlockConfig::*;
match self {
Gate { output, .. } => {
vec![("output", Some(output.clone()))]
}
}
}
}

impl BlockInstantiation for FlowBlockConfig {
fn instantiate(&self, system: &mut System) -> Box<dyn Block> {
use super::SystemBuilding;
use FlowBlockConfig::*;
match self {
Gate { .. } => Box::new(super::Gate::<_, ()>::new(
system.input_any(),
system.input(),
system.output_any(),
)),
}
}
}

impl BlockInstantiation for FlowBlockConfig {}
mod gate;
pub use gate::*;
}

pub use flow::*;
138 changes: 138 additions & 0 deletions lib/protoflow-blocks/src/blocks/flow/gate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// This is free and unencumbered software released into the public domain.

use crate::{prelude::Vec, StdioConfig, StdioError, StdioSystem, System};
use protoflow_core::{
types::Any, Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort,
};
use protoflow_derive::Block;
use simple_mermaid::mermaid;

/// A block that keeps all messages it receives,
/// and sends them downstream when triggered.
///
/// When triggered, the block will send all messages it received since last trigger,
/// and _WILL_ clean the internal buffer.
///
/// # Block Diagram
#[doc = mermaid!("../../../doc/flow/gate.mmd")]
///
/// # Sequence Diagram
#[doc = mermaid!("../../../doc/flow/gate.seq.mmd" framed)]
///
/// # Examples
///
/// ## Using the block in a system
///
/// ```rust
/// # use protoflow_blocks::*;
/// # fn main() {
/// System::build(|s| {
/// let stdin = s.read_stdin();
/// let hello = s.const_string("Hello, World!");
/// let encode = s.encode_lines();
/// let gate = s.gate();
/// let stdout = s.write_stdout();
/// s.connect(&hello.output, &encode.input);
/// s.connect(&encode.output, &gate.input);
/// s.connect(&stdin.output, &gate.trigger);
/// s.connect(&gate.output, &stdout.input);
/// });
/// # }
/// ```
///
/// ## Running the block via the CLI
///
/// ```console
/// $ protoflow execute Gate
/// ```
///
#[derive(Block, Clone)]
pub struct Gate<Input: Message = Any, Trigger: Message = ()> {
/// The input message stream.
#[input]
pub input: InputPort<Input>,

/// The trigger port.
#[input]
pub trigger: InputPort<Trigger>,

/// The output message stream.
#[output]
pub output: OutputPort<Input>,

/// The internal state storing the messages received.
#[state]
messages: Vec<Input>,
}

impl<Input: Message, Trigger: Message> Gate<Input, Trigger> {
pub fn new(
input: InputPort<Input>,
trigger: InputPort<Trigger>,
output: OutputPort<Input>,
) -> Self {
Self {
input,
trigger,
output,
messages: Vec::new(),
}
}

pub fn messages(&self) -> &Vec<Input> {
&self.messages
}
}

impl<Input: Message + 'static, Trigger: Message + 'static> Gate<Input, Trigger> {
pub fn with_system(system: &System) -> Self {
use crate::SystemBuilding;
Self::new(system.input(), system.input(), system.output())
}
}

impl<Input: Message, Trigger: Message> Block for Gate<Input, Trigger> {
fn execute(&mut self, _: &dyn BlockRuntime) -> BlockResult {
while let Some(message) = self.input.recv()? {
self.messages.push(message);
}

while let Some(_) = self.trigger.recv()? {
let iter = self.messages.drain(..);
for message in iter {
self.output.send(&message)?;
}
}

Ok(())
}
}

#[cfg(feature = "std")]
impl<Input: Message, Trigger: Message> StdioSystem for Gate<Input, Trigger> {
fn build_system(config: StdioConfig) -> Result<System, StdioError> {
use crate::{FlowBlocks, SystemBuilding};

config.reject_any()?;

Ok(System::build(|s| {
let stdin = config.read_stdin(s);
let gate = s.gate::<_, ()>();
s.connect(&stdin.output, &gate.input);
}))
}
}

#[cfg(test)]
mod tests {
use super::Gate;
use crate::{System, SystemBuilding};

#[test]
fn instantiate_block() {
// Check that the block is constructible:
let _ = System::build(|s| {
let _ = s.block(Gate::<i32>::new(s.input(), s.input(), s.output()));
});
}
}
1 change: 1 addition & 0 deletions lib/protoflow-blocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub fn build_stdio_system(
"Drop" => Drop::<String>::build_system(config)?,
"Random" => Random::<u64>::build_system(config)?,
// FlowBlocks
"Gate" => Gate::<String>::build_system(config)?,
// HashBlocks
#[cfg(any(
feature = "hash-blake3",
Expand Down
14 changes: 10 additions & 4 deletions lib/protoflow-blocks/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use crate::{
prelude::{fmt, Arc, Box, Bytes, FromStr, Rc, String, ToString},
types::{DelayType, Encoding},
AllBlocks, Buffer, ConcatStrings, Const, CoreBlocks, Count, Decode, DecodeCsv, DecodeHex,
DecodeJson, Delay, Drop, Encode, EncodeCsv, EncodeHex, EncodeJson, FlowBlocks, HashBlocks,
IoBlocks, MathBlocks, Random, ReadDir, ReadEnv, ReadFile, ReadStdin, SplitString, SysBlocks,
TextBlocks, WriteFile, WriteStderr, WriteStdout,
DecodeJson, Delay, Drop, Encode, EncodeCsv, EncodeHex, EncodeJson, FlowBlocks, Gate,
HashBlocks, IoBlocks, MathBlocks, Random, ReadDir, ReadEnv, ReadFile, ReadStdin, SplitString,
SysBlocks, TextBlocks, WriteFile, WriteStderr, WriteStdout,
};
#[cfg(all(feature = "std", feature = "serde"))]
use crate::{ReadSocket, WriteSocket};
Expand Down Expand Up @@ -170,7 +170,13 @@ impl CoreBlocks for System {
}
}

impl FlowBlocks for System {}
impl FlowBlocks for System {
fn gate<Input: Message + 'static, Trigger: Message + 'static>(
&mut self,
) -> Gate<Input, Trigger> {
self.0.block(Gate::<Input, Trigger>::with_system(self))
}
}

#[cfg(not(any(
feature = "hash-blake3",
Expand Down