Skip to content

Commit

Permalink
doc: improve documentation of Input / Output
Browse files Browse the repository at this point in the history
  • Loading branch information
J-Loudet committed Apr 19, 2023
1 parent 5374389 commit b86debf
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 93 deletions.
94 changes: 50 additions & 44 deletions zenoh-flow/src/io/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,37 @@ use std::ops::Deref;
use std::sync::Arc;
use uhlc::Timestamp;

/// The [`Inputs`](`Inputs`) structure contains all the receiving channels we created for a
/// [`Sink`](`crate::prelude::Sink`) or an [`Operator`](`crate::prelude::Operator`).
/// The `Inputs` structure contains all the inputs created for a [Sink](crate::prelude::Sink) or an
/// [Operator](crate::prelude::Operator).
///
/// To access these underlying channels, two methods are available:
/// - `take`: this will return an `Input<T>` where `T` implements [`ZFData`](`ZFData`),
/// - `take_raw`: this will return an [`InputRaw`](`InputRaw`) — a type agnostic receiver.
/// Each input is indexed by its **port identifier**: the name that was indicated in the descriptor
/// of the node. These names are _case sensitive_ and should be an exact match to what was written
/// in the descriptor.
///
/// Choosing between `take` and `take_raw` is a trade-off between convenience and performance: an
/// `Input<T>` conveniently receives instances of `T` and is thus less performant as Zenoh-Flow has
/// to manipulate the data; an `InputRaw` is more performant but all the manipulation must be
/// performed in the code of the Node. An `InputRaw` is currently leveraged by the bindings in other
/// programming languages — as they cannot understand the types coming from Rust — but can also be
/// leveraged, for instance, for load-balancing or rate-limiting.
/// Zenoh-Flow provides two flavors of input: [InputRaw] and [`Input<T>`]. An [`Input<T>`]
/// conveniently exposes instances of `T` while an [InputRaw] exposes messages, allowing to
/// disregard the contained data.
///
/// The main way to interact with `Inputs` is through the `take` method.
///
/// # Example
///
/// ```ignore
/// let input_builder = inputs.take("test raw").expect("No input name 'test raw' found");
/// let input_raw = input_builder.build_raw();
///
/// let input_builder = inputs.take("test typed").expect("No input name 'test typed' found");
/// let input: Input<u64> = input_build.build_typed(
/// |bytes| serde_json::from_slice(bytes)
/// .map_err(|e| anyhow::anyhow!(e))
/// )?;
/// ```
pub struct Inputs {
pub(crate) hmap: HashMap<PortId, Vec<flume::Receiver<LinkMessage>>>,
}

// Dereferencing on the internal [`HashMap`](`std::collections::Hashmap`) allows users to call all the methods
// implemented on it: `keys()` for one.
// Dereferencing on the internal `Hashmap` allows users to call all the methods implemented on it:
// `keys()` for one.
impl Deref for Inputs {
type Target = HashMap<PortId, Vec<flume::Receiver<LinkMessage>>>;

Expand All @@ -56,8 +68,8 @@ impl Inputs {
}
}

/// Insert the `flume::Receiver` in the [`Inputs`](`Inputs`), creating the entry if needed in
/// the internal `HashMap`.
/// Insert the `flume::Receiver` in the [Inputs], creating the entry if needed in the internal
/// `HashMap`.
pub(crate) fn insert(&mut self, port_id: PortId, rx: flume::Receiver<LinkMessage>) {
self.hmap
.entry(port_id)
Expand Down Expand Up @@ -209,16 +221,16 @@ impl InputRaw {
self.receivers.len()
}

/// Returns the first [`LinkMessage`](`LinkMessage`) that was received on any of the channels
/// associated with this Input, or an `Empty` error if there were no messages.
/// Returns the first [LinkMessage] that was received on any of the channels associated with
/// this Input, or an `Empty` error if there were no messages.
///
/// ## Asynchronous alternative: `recv`
/// # Asynchronous alternative: `recv`
///
/// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `recv`.
/// Although synchronous, but given it is "fail-fast", this method will not block the thread on
/// which it is executed.
///
/// ## Error
/// # Error
///
/// If no message was received, an `Empty` error is returned. Note that if some channels are
/// disconnected, for each of such channel an error is logged.
Expand All @@ -238,13 +250,12 @@ impl InputRaw {
bail!(ErrorKind::Empty, "[Input: {}] No message", self.port_id)
}

/// Returns the first [`LinkMessage`](`LinkMessage`) that was received, *asynchronously*, on any
/// of the channels associated with this Input.
/// Returns the first [LinkMessage] that was received, *asynchronously*, on any of the channels
/// associated with this Input.
///
/// If several [`LinkMessage`](`LinkMessage`) are received at the same time, one is *randomly*
/// selected.
/// If several [LinkMessage] are received at the same time, one is *randomly* selected.
///
/// ## Error
/// # Error
///
/// An error is returned if *all* channels are disconnected. For each disconnected channel, an
/// error is separately logged.
Expand Down Expand Up @@ -276,22 +287,19 @@ impl InputRaw {
}
}

/// A typed `Input` that tries to automatically downcast or deserialize the data received into an
/// instance of `T`.
/// A typed `Input` that tries to automatically downcast or deserialize the data received in order
/// to expose `&T`.
///
/// ## Performance
/// # Performance
///
/// If the data is received serialized from the upstream node, an allocation is performed to host
/// the deserialized `T`.
///
/// If the data is received typed, the downcast performs a pointer indirection.
pub struct Input<T> {
pub(crate) input_raw: InputRaw,
pub(crate) deserializer: Arc<DeserializerFn<T>>,
}

// Dereferencing to the [`InputRaw`](`InputRaw`) allows to directly call methods on it with a typed
// [`Input`](`Input`).
// Dereferencing to the [InputRaw] allows to directly call methods on it with a typed [Input].
impl<T: Send + Sync + 'static> Deref for Input<T> {
type Target = InputRaw;

Expand All @@ -301,23 +309,21 @@ impl<T: Send + Sync + 'static> Deref for Input<T> {
}

impl<T: Send + Sync + 'static> Input<T> {
/// Returns the first [`Message<T>`](`Message`) that was received, *asynchronously*, on any of
/// the channels associated with this Input.
/// Returns the first [`Message<T>`] that was received, *asynchronously*, on any of the channels
/// associated with this Input.
///
/// If several [`Message<T>`](`Message`) are received at the same time, one is *randomly*
/// selected.
/// If several [`Message<T>`] are received at the same time, one is *randomly* selected.
///
/// This method interprets the data to the type associated with this [`Input<T>`](`Input`).
/// This method interprets the data to the type associated with this [`Input<T>`].
///
/// ## Performance
/// # Performance
///
/// As this method interprets the data received additional operations are performed:
/// - data received serialized is deserialized (an allocation is performed to store an instance
/// of `T`),
/// - data received "typed" are checked against the type associated to this
/// [`Input<T>`](`Input`).
/// - data received "typed" are checked against the type associated to this [`Input<T>`].
///
/// ## Error
/// # Error
///
/// Several errors can occur:
/// - all the channels are disconnected,
Expand All @@ -332,15 +338,15 @@ impl<T: Send + Sync + 'static> Input<T> {
}
}

/// Returns the first [`Message<T>`](`Message`) that was received on any of the channels
/// associated with this Input, or `None` if all the channels are empty.
/// Returns the first [`Message<T>`] that was received on any of the channels associated with this
/// Input, or `None` if all the channels are empty.
///
/// ## Asynchronous alternative: `recv`
/// # Asynchronous alternative: `recv`
///
/// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `recv`.
/// Although synchronous, this method will not block the thread on which it is executed.
///
/// ## Error
/// # Error
///
/// Several errors can occur:
/// - no message was received (i.e. Empty error),
Expand Down
94 changes: 46 additions & 48 deletions zenoh-flow/src/io/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,16 @@ use std::sync::{
};
use uhlc::{Timestamp, HLC};

/// The [`Outputs`](`Outputs`) structure contains all the sender channels we created for a
/// [`Source`](`crate::prelude::Source`) or an [`Operator`](`crate::prelude::Operator`).
/// The [Outputs] structure contains all the outputs created for a [Source](crate::prelude::Source)
/// or an [Operator](crate::prelude::Operator).
///
/// To access these underlying channels, two methods are available:
/// - `take`: this will return an `Output<T>` where `T` implements [`ZFData`](`ZFData`),
/// - `take_raw`: this will return an [`OutputRaw`](`OutputRaw`) — a type agnostic sender.
/// Each output is indexed by its **port identifier**: the name that was indicated in the descriptor
/// of the node. These names are _case sensitive_ and should be an exact match to what was written
/// in the descriptor.
///
/// Choosing between `take` and `take_raw` is a trade-off between convenience and performance: an
/// `Output<T>` conveniently accepts instances of `T` and is thus less performant as Zenoh-Flow has
/// to manipulate the data (transforming it into a [`LinkMessage`](`LinkMessage`)); an `OutputRaw`
/// is more performant but the transformation must be performed in the code of the Node.
/// Zenoh-Flow provides two flavors of output: [OutputRaw] and [`Output<T>`]. An [`Output<T>`]
/// conveniently accepts instances of `T` while an [OutputRaw] operates at the message level,
/// potentially disregarding the data it contains.
pub struct Outputs {
pub(crate) hmap: HashMap<PortId, Vec<flume::Sender<LinkMessage>>>,
pub(crate) hlc: Arc<HLC>,
Expand Down Expand Up @@ -213,11 +212,11 @@ impl OutputBuilder {
}
}

/// An [`OutputRaw`](`OutputRaw`) sends [`LinkMessage`](`LinkMessage`) to downstream Nodes.
/// An [OutputRaw] sends [LinkMessage] or `Into<`[Payload]`>` to downstream Nodes.
///
/// It's primary purpose is to ensure "optimal" performance by performing no operation on the
/// received [`LinkMessage`](`LinkMessage`). This can be useful to implement behaviour where actual
/// access to the underlying data is irrelevant.
/// Its primary purpose is to ensure optimal performance: any message received on an input can
/// transparently be sent downstream, without requiring (a potentially expensive) access to the data
/// it contained.
#[derive(Clone)]
pub struct OutputRaw {
pub(crate) port_id: PortId,
Expand All @@ -239,7 +238,7 @@ impl OutputRaw {

/// If a timestamp is provided, check that it is not inferior to the latest watermark.
///
/// If no timestamp is provided, a new one is generated from the HLC.
/// If no timestamp is provided, a new one is generated from the [HLC](uhlc::HLC).
pub(crate) fn check_timestamp(&self, timestamp: Option<u64>) -> Result<Timestamp> {
let ts = match timestamp {
Some(ts_u64) => Timestamp::new(uhlc::NTP64(ts_u64), *self.hlc.get_id()),
Expand All @@ -255,13 +254,13 @@ impl OutputRaw {

/// Attempt to forward, *synchronously*, the message to the downstream Nodes.
///
/// ## Asynchronous alternative: `forward`
/// # Asynchronous alternative: `forward`
///
/// This method is a synchronous fail-fast alternative to it's asynchronous counterpart:
/// `forward`. Hence, although synchronous, this method will not block the thread on which it is
/// executed.
///
/// ## Errors
/// # Errors
///
/// If an error occurs while sending the message on a channel, Zenoh-Flow still tries to send it
/// on the remaining channels. For each failing channel, an error is logged.
Expand Down Expand Up @@ -294,17 +293,17 @@ impl OutputRaw {
Ok(())
}

/// Attempt to send, synchronously, the `data` on all channels to the downstream Nodes.
/// Attempt to send, *synchronously*, the `data` on all channels to the downstream Nodes.
///
/// If no `timestamp` is provided, the current timestamp as per the [`HLC`](`HLC`) used by the
/// Zenoh-Flow daemon running this Node is taken.
/// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by
/// the Zenoh-Flow daemon running this Node) is taken.
///
/// ## Asynchronous alternative: `send`
/// # Asynchronous alternative: `send`
///
/// This method is a synchronous fail-fast alternative to its asynchronous counterpart: `send`.
/// Hence, although synchronous, this method will not block the thread on which it is executed.
///
/// ## Errors
/// # Errors
///
/// If an error occurs while sending the watermark on a channel, Zenoh-Flow still tries to send
/// it on the remaining channels. For each failing channel, an error is logged and counted for.
Expand All @@ -317,15 +316,15 @@ impl OutputRaw {

/// Attempt to send, *synchronously*, the watermark on all channels to the downstream Nodes.
///
/// If no `timestamp` is provided, the current timestamp as per the [`HLC`](`HLC`) used by the
/// Zenoh-Flow daemon running this Node is taken.
/// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by
/// the Zenoh-Flow daemon running this Node) is taken.
///
/// ## Asynchronous alternative: `send_watermark`
/// # Asynchronous alternative: `send_watermark`
///
/// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `send`.
/// Although synchronous, this method will not block the thread on which it is executed.
///
/// ## Errors
/// # Errors
///
/// If an error occurs while sending the watermark on a channel, Zenoh-Flow still tries to send
/// it on the remaining channels. For each failing channel, an error is logged and counted for.
Expand All @@ -337,10 +336,9 @@ impl OutputRaw {
self.try_forward(message)
}

/// Forward, *asynchronously*, the [`LinkMessage`](`LinkMessage`) on all channels to the
/// downstream Nodes.
/// Forward, *asynchronously*, the [LinkMessage] on all channels to the downstream Nodes.
///
/// ## Errors
/// # Errors
///
/// If an error occurs while sending the message on a channel, Zenoh-Flow still tries to send
/// it on the remaining channels. For each failing channel, an error is logged and counted for.
Expand Down Expand Up @@ -380,10 +378,10 @@ impl OutputRaw {

/// Send, *asynchronously*, the `data` on all channels to the downstream Nodes.
///
/// If no `timestamp` is provided, the current timestamp — as per the [`HLC`](`HLC`) used by the
/// Zenoh-Flow daemon running this Node — is taken.
/// If no `timestamp` is provided, the current timestamp — as per the [HLC](uhlc::HLC) used by
/// the Zenoh-Flow daemon running this Node — is taken.
///
/// ## Errors
/// # Errors
///
/// If an error occurs while sending the watermark on a channel, Zenoh-Flow still tries to send
/// it on the remaining channels. For each failing channel, an error is logged and counted for.
Expand All @@ -394,18 +392,18 @@ impl OutputRaw {
self.forward(message).await
}

/// Send, *asynchronously*, a [`Watermark`](`LinkMessage::Watermark`) on all channels.
/// Send, *asynchronously*, a [Watermark](LinkMessage::Watermark) on all channels.
///
/// If no `timestamp` is provided, the current timestamp as per the [`HLC`](`HLC`) used by the
/// Zenoh-Flow daemon running this Node is taken.
/// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by
/// the Zenoh-Flow daemon running this Node) is taken.
///
/// ## Watermarks
/// # Watermarks
///
/// A [`Watermark`](`LinkMessage::Watermark`) is a special kind of message whose purpose is to
/// signal and guarantee the fact that no message with a lower [`Timestamp`](`Timestamp`) will
/// be send afterwards.
/// A [Watermark](LinkMessage::Watermark) is a special kind of message whose purpose is to
/// signal and guarantee the fact that no message with a lower [Timestamp] will be send
/// afterwards.
///
/// ## Errors
/// # Errors
///
/// If an error occurs while sending the watermark on a channel, Zenoh-Flow still tries to send
/// it on the remaining channels. For each failing channel, an error is logged and counted for.
Expand All @@ -418,7 +416,7 @@ impl OutputRaw {
}
}

/// An [`Output<T>`](`Output`) sends instances of `T` to downstream Nodes.
/// An [`Output<T>`] sends instances of `T` to downstream Nodes.
///
/// It's primary purpose is to ensure type guarantees: only types that implement `Into<T>` can be
/// sent to downstream Nodes.
Expand Down Expand Up @@ -453,15 +451,15 @@ impl<T: Send + Sync + 'static> Output<T> {

/// Send, *asynchronously*, the provided `data` to all downstream Nodes.
///
/// If no `timestamp` is provided, the current timestamp as per the [`HLC`](`HLC`) used by the
/// Zenoh-Flow daemon running this Node is taken.
/// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by
/// the Zenoh-Flow daemon running this Node) is taken.
///
/// ## Constraint `Into<Data<T>>`
/// # Constraint `Into<Data<T>>`
///
/// Both `T` and `Data<T>` implement this constraint. Hence, in practice, any type that
/// implements `Into<T>` can be sent (provided that `Into::<T>::into(u)` is called first).
///
/// ## Errors
/// # Errors
///
/// If an error occurs while sending the message on a channel, Zenoh-Flow still tries to send it
/// on the remaining channels. For each failing channel, an error is logged and counted for. The
Expand All @@ -474,15 +472,15 @@ impl<T: Send + Sync + 'static> Output<T> {

/// Tries to send the provided `data` to all downstream Nodes.
///
/// If no `timestamp` is provided, the current timestamp as per the [`HLC`](`HLC`) used by the
/// Zenoh-Flow daemon running this Node is taken.
/// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by
/// the Zenoh-Flow daemon running this Node) is taken.
///
/// ## Constraint `Into<Data<T>>`
/// # Constraint `Into<Data<T>>`
///
/// Both `T` and `Data<T>` implement this constraint. Hence, in practice, any type that
/// implements `Into<T>` can be sent (provided that `Into::<T>::into(u)` is called first).
///
/// ## Errors
/// # Errors
///
/// If an error occurs while sending the message on a channel, Zenoh-Flow still tries to send it
/// on the remaining channels. For each failing channel, an error is logged and counted for. The
Expand Down
Loading

0 comments on commit b86debf

Please sign in to comment.