From 28d8984db569e195d6c258e7333edfe5fa9c3f9a Mon Sep 17 00:00:00 2001 From: J-Loudet Date: Tue, 2 May 2023 17:21:37 +0200 Subject: [PATCH] feat: support for custom (de)serializer (#156) * feat: support for custom (de)serializer This commit introduces the following changes to Zenoh-Flow: 1. types used within a Zenoh-Flow application do not have to implement the (cumbersome) `ZFData` trait; instead, 2. the types used must be `Send + Sync + 'static`, 3. an Output must know how to serialize the provided type `T` and, respectively, an Input must know how to deserialize bytes into `T`. With these changes, any SerDe compatible data serialization format is supported as well as, for instance, ProtoBuf. In terms of API, the major difference is how the Input and Output are obtained: ```rust input: inputs .take("in", |bytes| todo!("Provide your deserializer here")) .expect("No input called 'in' found"), output: outputs .take("out", |data| todo!("Provide your serializer here")) .expect("No output called 'out' found"), ``` * fix: move `DeserializerFn` in types/message.rs * fix: remove explicit call to `Arc::clone` * fix: add `as_mut_any` on `SendSyncAny` trait * fix: implement/derive Debug for the different messages * feat: InputBuilder and OutputBuilder Instead of having the methods `take` and `take_raw` on the Inputs and Outputs, that give, respectively, the Typed and Raw Input/Output, this commit removes the `take_raw` variant and introduces builders. The builders can be turned into the Typed or Raw variant using the corresponding methods: `build_typed` and `build_raw`. * doc: improve documentation of Input / Output * feat: reuse buffer when serializing (#160) * feat: reuse buffer when serializing This commit tries to minimize the number of allocations performed when serializing data. For this end, the connector and built-in Source now create internal `Vec` buffers that are reused whenever a message is serialized. This change should hopefully improve performance. * doc: fix typos * refacto: renamed `build_raw` and `build_typed` to `raw` and `typed` --------- Signed-off-by: Julien Loudet --- zenoh-flow-derive/src/lib.rs | 30 -- zenoh-flow/Cargo.toml | 1 + zenoh-flow/src/io/input.rs | 265 ++++++--- zenoh-flow/src/io/mod.rs | 4 +- zenoh-flow/src/io/output.rs | 297 +++++++---- zenoh-flow/src/io/tests/input-tests.rs | 146 +++++ zenoh-flow/src/io/tests/output-tests.rs | 149 ++++++ zenoh-flow/src/io/tests/test_types.proto | 23 + zenoh-flow/src/lib.rs | 8 +- .../dataflow/instance/builtin/zenoh.rs | 82 +-- .../dataflow/instance/runners/connector.rs | 186 ++++--- zenoh-flow/src/traits.rs | 92 ++-- zenoh-flow/src/types/message.rs | 448 +++++++--------- zenoh-flow/src/zfdata/mod.rs | 503 ------------------ zenoh-flow/tests/data.rs | 89 ---- zenoh-flow/tests/dataflow.rs | 357 ++++++------- 16 files changed, 1274 insertions(+), 1406 deletions(-) create mode 100644 zenoh-flow/src/io/tests/input-tests.rs create mode 100644 zenoh-flow/src/io/tests/output-tests.rs create mode 100644 zenoh-flow/src/io/tests/test_types.proto delete mode 100644 zenoh-flow/src/zfdata/mod.rs delete mode 100644 zenoh-flow/tests/data.rs diff --git a/zenoh-flow-derive/src/lib.rs b/zenoh-flow-derive/src/lib.rs index 0218eda5..a046abc3 100644 --- a/zenoh-flow-derive/src/lib.rs +++ b/zenoh-flow-derive/src/lib.rs @@ -16,36 +16,6 @@ use proc_macro::TokenStream; use quote::quote; use syn::{parse_macro_input, DeriveInput}; -/// The `ZFData` derive macro is provided to help the users -/// in implementing the `DowncastAny` trait. -/// -/// ## Example -/// -/// ```no_compile -/// use zenoh_flow_derive::ZFData; -/// -/// #[derive(Debug, Clone, ZFData)] -/// pub struct ZFString(pub String); -/// ``` -#[proc_macro_derive(ZFData)] -pub fn zf_data_derive(input: TokenStream) -> TokenStream { - let ast = parse_macro_input!(input as DeriveInput); - let ident = &ast.ident; - let gen = quote! { - - impl zenoh_flow::prelude::DowncastAny for #ident { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } - } - }; - gen.into() -} - /// The `export_source` attribute macro is provided to allow the users /// in exporting their source. /// diff --git a/zenoh-flow/Cargo.toml b/zenoh-flow/Cargo.toml index dee34495..a98c5708 100644 --- a/zenoh-flow/Cargo.toml +++ b/zenoh-flow/Cargo.toml @@ -69,6 +69,7 @@ zrpc-macros = { version= "=0.6.1-alpha.1" } [dev-dependencies] tempdir = "0.3.7" +prost = "0.11" [build-dependencies] rustc_version = "0.4.0" diff --git a/zenoh-flow/src/io/input.rs b/zenoh-flow/src/io/input.rs index 3fb491ea..3ed72c53 100644 --- a/zenoh-flow/src/io/input.rs +++ b/zenoh-flow/src/io/input.rs @@ -12,34 +12,47 @@ // ZettaScale Zenoh Team, // -use crate::prelude::{ErrorKind, Message, PortId, ZFData}; -use crate::types::{Data, DataMessage, LinkMessage}; +use crate::prelude::{ErrorKind, Message, PortId}; +use crate::types::{Data, DataMessage, DeserializerFn, LinkMessage}; use crate::{bail, Result}; + use flume::TryRecvError; use std::collections::HashMap; -use std::marker::PhantomData; use std::ops::Deref; +use std::sync::Arc; use uhlc::Timestamp; -/// The [`Inputs`](`Inputs`) structure contains all the receiving channels we created for a -/// [`Sink`](`crate::prelude::Sink`) or an [`Operator`](`crate::prelude::Operator`). +/// The `Inputs` structure contains all the inputs created for a [Sink](crate::prelude::Sink) or an +/// [Operator](crate::prelude::Operator). +/// +/// Each input is indexed by its **port identifier**: the name that was indicated in the descriptor +/// of the node. These names are _case sensitive_ and should be an exact match to what was written +/// in the descriptor. +/// +/// Zenoh-Flow provides two flavors of input: [InputRaw] and [`Input`]. An [`Input`] +/// conveniently exposes instances of `T` while an [InputRaw] exposes messages, allowing to +/// disregard the contained data. /// -/// To access these underlying channels, two methods are available: -/// - `take`: this will return an `Input` where `T` implements [`ZFData`](`ZFData`), -/// - `take_raw`: this will return an [`InputRaw`](`InputRaw`) — a type agnostic receiver. +/// The main way to interact with `Inputs` is through the `take` method. /// -/// Choosing between `take` and `take_raw` is a trade-off between convenience and performance: an -/// `Input` conveniently receives instances of `T` and is thus less performant as Zenoh-Flow has -/// to manipulate the data; an `InputRaw` is more performant but all the manipulation must be -/// performed in the code of the Node. An `InputRaw` is currently leveraged by the bindings in other -/// programming languages — as they cannot understand the types coming from Rust — but can also be -/// leveraged, for instance, for load-balancing or rate-limiting. +/// # Example +/// +/// ```ignore +/// let input_builder = inputs.take("test raw").expect("No input name 'test raw' found"); +/// let input_raw = input_builder.raw(); +/// +/// let input_builder = inputs.take("test typed").expect("No input name 'test typed' found"); +/// let input: Input = input_build.typed( +/// |bytes| serde_json::from_slice(bytes) +/// .map_err(|e| anyhow::anyhow!(e)) +/// )?; +/// ``` pub struct Inputs { pub(crate) hmap: HashMap>>, } -// Dereferencing on the internal [`HashMap`](`std::collections::Hashmap`) allows users to call all the methods -// implemented on it: `keys()` for one. +// Dereferencing on the internal `Hashmap` allows users to call all the methods implemented on it: +// `keys()` for one. impl Deref for Inputs { type Target = HashMap>>; @@ -55,8 +68,8 @@ impl Inputs { } } - /// Insert the `flume::Receiver` in the [`Inputs`](`Inputs`), creating the entry if needed in - /// the internal `HashMap`. + /// Insert the `flume::Receiver` in the [Inputs], creating the entry if needed in the internal + /// `HashMap`. pub(crate) fn insert(&mut self, port_id: PortId, rx: flume::Receiver) { self.hmap .entry(port_id) @@ -64,50 +77,130 @@ impl Inputs { .push(rx) } - /// Returns the typed [`Input`](`Input`) associated to the provided `port_id`, if one is - /// associated, otherwise `None` is returned. + /// Returns an [InputBuilder] for the provided `port_id`, if an input was declared with this + /// exact name in the descriptor of the node, otherwise returns `None`. /// - /// ## Performance + /// # Usage /// - /// With a typed [`Input`](`Input`), Zenoh-Flow will perform operations on the underlying - /// data: if the data are received serialized then Zenoh-Flow will deserialize them, if they are - /// received "typed" then Zenoh-Flow will check that the type matches what is expected. + /// This builder can either produce a, typed, [`Input`] or an [InputRaw]. The main difference + /// between both is the type of data they expose: an [`Input`] automatically tries to downcast + /// or deserialize the data contained in the message to expose `&T`, while an [InputRaw] simply + /// exposes a [LinkMessage]. /// - /// If the underlying data is not relevant then these additional operations can be avoided by - /// calling `take_raw` and using an [`InputRaw`](`InputRaw`) instead. - pub fn take(&mut self, port_id: impl AsRef) -> Option> { - self.hmap.remove(port_id.as_ref()).map(|receivers| Input { - _phantom: PhantomData, - input_raw: InputRaw { - port_id: port_id.as_ref().into(), - receivers, - }, - }) - } - - /// Returns the [`InputRaw`](`InputRaw`) associated to the provided `port_id`, if one is - /// associated, otherwise `None` is returned. + /// As long as data need to be manipulated, a typed [`Input`] should be favored. + /// + /// ## Typed + /// + /// To obtain an [`Input`] one must call `typed` and provide a deserializer function. In + /// the example below we rely on the `serde_json` crate to do the deserialization. /// - /// ## Convenience + /// ```ignore + /// let input_typed: Input = inputs + /// .take("test") + /// .expect("No input named 'test' found") + /// .typed( + /// |bytes: &[u8]| serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!(e)) + /// ); + /// ``` /// - /// With an [`InputRaw`](`InputRaw`), Zenoh-Flow will not manipulate the underlying data, for - /// instance trying to deserialize them to a certain `T`. + /// ## Raw /// - /// It is thus up to the user to call `try_get::()` on the [`Payload`](`crate::prelude::Payload`) and handle - /// the error that could surface. + /// To obtain an [InputRaw] one must call `raw`. /// - /// If all the data must be "converted" to the same type `T` then calling `take::()` and - /// using a typed [`Input`](`Input`) will be more convenient and as efficient. - pub fn take_raw(&mut self, port_id: impl AsRef) -> Option { + /// ```ignore + /// let input_raw: InputRaw = inputs + /// .take("test") + /// .expect("No input named 'test' found") + /// .raw(); + /// ``` + pub fn take(&mut self, port_id: impl AsRef) -> Option { self.hmap .remove(port_id.as_ref()) - .map(|receivers| InputRaw { + .map(|receivers| InputBuilder { port_id: port_id.as_ref().into(), receivers, }) } } +/// An `InputBuilder` is the intermediate structure to obtain either an [`Input`] or an +/// [InputRaw]. +/// +/// The main difference between both is the type of data they expose: an [`Input`] automatically +/// tries to downcast or deserialize the data contained in the message to expose `&T`, while an +/// [InputRaw] simply exposes a [LinkMessage]. +/// +/// # Planned evolution +/// +/// Zenoh-Flow will allow tweaking the behaviour of the underlying channels. For now, the +/// `receivers` channels are _unbounded_ and do not implement a dropping policy, which could lead to +/// issues. +pub struct InputBuilder { + pub(crate) port_id: PortId, + pub(crate) receivers: Vec>, +} + +impl InputBuilder { + /// Consume the `InputBuilder` to produce an [InputRaw]. + /// + /// An [InputRaw] exposes the [LinkMessage] it receives, without trying to perform any + /// conversion on the data. + /// + /// The [InputRaw] was designed for use cases such as load-balancing or rate-limiting. In these + /// scenarios, the node does not need to access the underlying data. + /// + /// # `InputRaw` vs `Input` + /// + /// If the node needs access to the data to perform computations, an [`Input`] should be + /// favored as it performs the conversion automatically. + /// + /// # Example + /// + /// ```ignore + /// let input_raw: InputRaw = inputs + /// .take("test") + /// .expect("No input named 'test' found") + /// .raw(); + /// ``` + pub fn raw(self) -> InputRaw { + InputRaw { + port_id: self.port_id, + receivers: self.receivers, + } + } + + /// Consume the `InputBuilder` to produce an [`Input`]. + /// + /// An [`Input`] tries to automatically convert the data contained in the [LinkMessage] in + /// order to expose `&T`. Depending on if the data is received serialized or not, to perform + /// this conversion either the `deserializer` is called or a downcast is attempted. + /// + /// # `Input` vs `InputRaw` + /// + /// If the node does need to access the data contained in the [LinkMessage], an [InputRaw] + /// should be favored as it does not try to perform the extra conversion steps. + /// + /// # Example + /// + /// ```ignore + /// let input_typed: Input = inputs + /// .take("test") + /// .expect("No input named 'test' found") + /// .typed( + /// |bytes: &[u8]| serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!(e)) + /// ); + /// ``` + pub fn typed( + self, + deserializer: impl Fn(&[u8]) -> anyhow::Result + Send + Sync + 'static, + ) -> Input { + Input { + input_raw: self.raw(), + deserializer: Arc::new(deserializer), + } + } +} + /// An [`InputRaw`](`InputRaw`) exposes the [`LinkMessage`](`LinkMessage`) it receives. /// /// It's primary purpose is to ensure "optimal" performance. This can be useful to implement @@ -128,16 +221,16 @@ impl InputRaw { self.receivers.len() } - /// Returns the first [`LinkMessage`](`LinkMessage`) that was received on any of the channels - /// associated with this Input, or an `Empty` error if there were no messages. + /// Returns the first [LinkMessage] that was received on any of the channels associated with + /// this Input, or an `Empty` error if there were no messages. /// - /// ## Asynchronous alternative: `recv` + /// # Asynchronous alternative: `recv` /// /// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `recv`. /// Although synchronous, but given it is "fail-fast", this method will not block the thread on /// which it is executed. /// - /// ## Error + /// # Error /// /// If no message was received, an `Empty` error is returned. Note that if some channels are /// disconnected, for each of such channel an error is logged. @@ -157,13 +250,12 @@ impl InputRaw { bail!(ErrorKind::Empty, "[Input: {}] No message", self.port_id) } - /// Returns the first [`LinkMessage`](`LinkMessage`) that was received, *asynchronously*, on any - /// of the channels associated with this Input. + /// Returns the first [LinkMessage] that was received, *asynchronously*, on any of the channels + /// associated with this Input. /// - /// If several [`LinkMessage`](`LinkMessage`) are received at the same time, one is *randomly* - /// selected. + /// If several [LinkMessage] are received at the same time, one is *randomly* selected. /// - /// ## Error + /// # Error /// /// An error is returned if *all* channels are disconnected. For each disconnected channel, an /// error is separately logged. @@ -195,15 +287,20 @@ impl InputRaw { } } -#[derive(Clone, Debug)] -pub struct Input { - _phantom: PhantomData, +/// A typed `Input` that tries to automatically downcast or deserialize the data received in order +/// to expose `&T`. +/// +/// # Performance +/// +/// If the data is received serialized from the upstream node, an allocation is performed to host +/// the deserialized `T`. +pub struct Input { pub(crate) input_raw: InputRaw, + pub(crate) deserializer: Arc>, } -// Dereferencing to the [`InputRaw`](`InputRaw`) allows to directly call methods on it with a typed -// [`Input`](`Input`). -impl Deref for Input { +// Dereferencing to the [InputRaw] allows to directly call methods on it with a typed [Input]. +impl Deref for Input { type Target = InputRaw; fn deref(&self) -> &Self::Target { @@ -211,46 +308,45 @@ impl Deref for Input { } } -impl Input { - /// Returns the first [`Message`](`Message`) that was received, *asynchronously*, on any of - /// the channels associated with this Input. +impl Input { + /// Returns the first [`Message`] that was received, *asynchronously*, on any of the channels + /// associated with this Input. /// - /// If several [`Message`](`Message`) are received at the same time, one is *randomly* - /// selected. + /// If several [`Message`] are received at the same time, one is *randomly* selected. /// - /// This method interprets the data to the type associated with this [`Input`](`Input`). + /// This method interprets the data to the type associated with this [`Input`]. /// - /// ## Performance + /// # Performance /// /// As this method interprets the data received additional operations are performed: /// - data received serialized is deserialized (an allocation is performed to store an instance /// of `T`), - /// - data received "typed" are checked against the type associated to this - /// [`Input`](`Input`). + /// - data received "typed" are checked against the type associated to this [`Input`]. /// - /// ## Error + /// # Error /// /// Several errors can occur: /// - all the channels are disconnected, /// - Zenoh-Flow failed at interpreting the received data as an instance of `T`. pub async fn recv(&self) -> Result<(Message, Timestamp)> { match self.input_raw.recv().await? { - LinkMessage::Data(DataMessage { data, timestamp }) => { - Ok((Message::Data(Data::try_new(data)?), timestamp)) - } + LinkMessage::Data(DataMessage { data, timestamp }) => Ok(( + Message::Data(Data::try_from_payload(data, self.deserializer.clone())?), + timestamp, + )), LinkMessage::Watermark(timestamp) => Ok((Message::Watermark, timestamp)), } } - /// Returns the first [`Message`](`Message`) that was received on any of the channels - /// associated with this Input, or `None` if all the channels are empty. + /// Returns the first [`Message`] that was received on any of the channels associated with this + /// Input, or `None` if all the channels are empty. /// - /// ## Asynchronous alternative: `recv` + /// # Asynchronous alternative: `recv` /// /// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `recv`. /// Although synchronous, this method will not block the thread on which it is executed. /// - /// ## Error + /// # Error /// /// Several errors can occur: /// - no message was received (i.e. Empty error), @@ -259,10 +355,15 @@ impl Input { /// Note that if some channels are disconnected, for each of such channel an error is logged. pub fn try_recv(&self) -> Result<(Message, Timestamp)> { match self.input_raw.try_recv()? { - LinkMessage::Data(DataMessage { data, timestamp }) => { - Ok((Message::Data(Data::try_new(data)?), timestamp)) - } + LinkMessage::Data(DataMessage { data, timestamp }) => Ok(( + Message::Data(Data::try_from_payload(data, self.deserializer.clone())?), + timestamp, + )), LinkMessage::Watermark(ts) => Ok((Message::Watermark, ts)), } } } + +#[cfg(test)] +#[path = "./tests/input-tests.rs"] +mod tests; diff --git a/zenoh-flow/src/io/mod.rs b/zenoh-flow/src/io/mod.rs index 38d5ca09..b9a0ec3e 100644 --- a/zenoh-flow/src/io/mod.rs +++ b/zenoh-flow/src/io/mod.rs @@ -15,5 +15,5 @@ pub mod input; pub mod output; -pub use input::{Input, InputRaw, Inputs}; -pub use output::{Output, OutputRaw, Outputs}; +pub use input::{Input, InputBuilder, InputRaw, Inputs}; +pub use output::{Output, OutputBuilder, OutputRaw, Outputs}; diff --git a/zenoh-flow/src/io/output.rs b/zenoh-flow/src/io/output.rs index 908f7381..0501fdc6 100644 --- a/zenoh-flow/src/io/output.rs +++ b/zenoh-flow/src/io/output.rs @@ -12,9 +12,9 @@ // ZettaScale Zenoh Team, // -use crate::prelude::{Data, ErrorKind, Payload, PortId, ZFData}; -use crate::types::LinkMessage; -use crate::{zferror, Result}; +use crate::prelude::{Data, ErrorKind, PortId}; +use crate::types::{LinkMessage, Payload, SerializerFn}; +use crate::{bail, zferror, Result}; use flume::Sender; use std::collections::HashMap; use std::marker::PhantomData; @@ -25,17 +25,16 @@ use std::sync::{ }; use uhlc::{Timestamp, HLC}; -/// The [`Outputs`](`Outputs`) structure contains all the sender channels we created for a -/// [`Source`](`crate::prelude::Source`) or an [`Operator`](`crate::prelude::Operator`). +/// The [Outputs] structure contains all the outputs created for a [Source](crate::prelude::Source) +/// or an [Operator](crate::prelude::Operator). /// -/// To access these underlying channels, two methods are available: -/// - `take`: this will return an `Output` where `T` implements [`ZFData`](`ZFData`), -/// - `take_raw`: this will return an [`OutputRaw`](`OutputRaw`) — a type agnostic sender. +/// Each output is indexed by its **port identifier**: the name that was indicated in the descriptor +/// of the node. These names are _case sensitive_ and should be an exact match to what was written +/// in the descriptor. /// -/// Choosing between `take` and `take_raw` is a trade-off between convenience and performance: an -/// `Output` conveniently accepts instances of `T` and is thus less performant as Zenoh-Flow has -/// to manipulate the data (transforming it into a [`LinkMessage`](`LinkMessage`)); an `OutputRaw` -/// is more performant but the transformation must be performed in the code of the Node. +/// Zenoh-Flow provides two flavors of output: [OutputRaw] and [`Output`]. An [`Output`] +/// conveniently accepts instances of `T` while an [OutputRaw] operates at the message level, +/// potentially disregarding the data it contains. pub struct Outputs { pub(crate) hmap: HashMap>>, pub(crate) hlc: Arc, @@ -59,63 +58,165 @@ impl Outputs { } } - /// Insert the `flume::Sender` in the [`Inputs`](`Inputs`), creating the entry if needed in the - /// internal `HashMap`. + /// Insert the `flume::Sender` in the [Outputs], creating the entry if needed in the internal + /// `HashMap`. pub(crate) fn insert(&mut self, port_id: PortId, tx: Sender) { self.hmap.entry(port_id).or_insert_with(Vec::new).push(tx) } - /// Returns the typed [`Output`](`Output`) associated to the provided `port_id`, if one is - /// associated, otherwise `None` is returned. + /// Returns an [OutputBuilder] for the provided `port_id`, if an output was declared with this + /// exact name in the descriptor of the node, otherwise returns `None`. /// - /// ## Performance + /// # Usage /// - /// With a typed [`Output`](`Output`), only `impl Into>` can be passed to the `send` - /// methods. This provides type guarantees and leaves the encapsulation to Zenoh-Flow. + /// This builder can either produce a, typed, [`Output`] or an [OutputRaw]. The main difference + /// between both is the type of data they accept: an [`Output`] accepts anything that is + /// `Into` while an [OutputRaw] accepts a [LinkMessage] or anything that is + /// `Into<`[Payload]`>`. /// - /// If the underlying data is not relevant (i.e. there is no need to access the `T`) then - /// calling `take_raw` and using an [`OutputRaw`](`OutputRaw`) will be more efficient. An - /// [`OutputRaw`](`OutputRaw`) has a dedicated `forward` method that performs no additional - /// operation and, simply, forwards a [`LinkMessage`](`LinkMessage`). - pub fn take(&mut self, port_id: impl AsRef) -> Option> { - self.hmap.remove(port_id.as_ref()).map(|senders| Output { - _phantom: PhantomData, - output_raw: OutputRaw { + /// As long as data are produced or manipulated, a typed [`Output`] should be favored. + /// + /// ## Typed + /// + /// To obtain an [`Output`] one must call `typed` and provide a serializer function. In + /// the example below we rely on the `serde_json` crate to do the serialization. + /// + /// ```ignore + /// let output_typed: Output = outputs + /// .take("test") + /// .expect("No key named 'test' found") + /// .typed(|data: &u64| serde_json::to_vec(data).map_err(|e| anyhow::anyhow!(e))); + /// ``` + /// + /// ## Raw + /// + /// To obtain an [OutputRaw] one must call `raw`. + /// + /// ```ignore + /// let output_raw = outputs + /// .take("test") + /// .expect("No key named 'test' found") + /// .raw(); + /// ``` + pub fn take(&mut self, port_id: impl AsRef) -> Option { + self.hmap + .remove(port_id.as_ref()) + .map(|senders| OutputBuilder { port_id: port_id.as_ref().into(), senders, hlc: Arc::clone(&self.hlc), last_watermark: Arc::new(AtomicU64::new( self.hlc.new_timestamp().get_time().as_u64(), )), - }, - }) + }) + } +} + +/// An [OutputBuilder] is the intermediate structure to obtain either an [`Output`] or an +/// [OutputRaw]. +/// +/// The main difference between both is the type of data they accept: an [`Output`] accepts +/// anything that is `Into` while an [OutputRaw] accepts a [LinkMessage] or anything that is +/// `Into<`[Payload]`>`. +/// +/// # Planned evolution +/// +/// Zenoh-Flow will allow tweaking the behaviour of the underlying channels. For now, the `senders` +/// channels are _unbounded_ and do not implement a dropping policy, which could lead to issues. +pub struct OutputBuilder { + pub(crate) port_id: PortId, + pub(crate) senders: Vec>, + pub(crate) hlc: Arc, + pub(crate) last_watermark: Arc, +} + +impl OutputBuilder { + /// Consume this `OutputBuilder` to produce an [OutputRaw]. + /// + /// An [OutputRaw] sends [LinkMessage]s (through `forward`) or anything that is + /// `Into<`[Payload]`>` (through `send` and `try_send`) to downstream nodes. + /// + /// The [OutputRaw] was designed for use cases such as load-balancing or rate-limiting. In this + /// scenarios, the node does not need to access the underlying data and the message can simply + /// be forwarded downstream. + /// + /// # `OutputRaw` vs `Output` + /// + /// If the node produces instances of `T` as a result of computations, an [`Output`] should be + /// favored as it sends anything that is `Into`. Thus, contrary to an [OutputRaw], there is + /// no need to encapsulate `T` inside a [Payload]. + /// + /// # Example + /// + /// ```ignore + /// let output_raw = outputs + /// .take("test") + /// .expect("No key named 'test' found") + /// .raw(); + /// ``` + pub fn raw(self) -> OutputRaw { + OutputRaw { + port_id: self.port_id, + senders: self.senders, + hlc: self.hlc, + last_watermark: self.last_watermark, + } } - /// Returns the [`OutputRaw`](`OutputRaw`) associated to the provided `port_id`, if one is - /// associated, otherwise `None` is returned. + /// Consume this `OutputBuilder` to produce an [`Output`]. + /// + /// An [`Output`] sends anything that is `Into` (through `send` and `try_send`) to + /// downstream nodes. + /// + /// An [`Output`] requires knowing how to serialize `T`. Data is only serialized when it is (a) + /// transmitted to a node located on another process or (b) transmitted to a node written in a + /// programming language other than Rust. + /// + /// The serialization will automatically be performed by Zenoh-Flow and only when needed. + /// + /// # `Output` vs `OutputRaw` /// - /// ## Convenience + /// If the node does not process any data and only has access to a [LinkMessage], an [OutputRaw] + /// would be better suited as it does not require to downcast it into an object that + /// implements `Into`. /// - /// With an [`OutputRaw`](`OutputRaw`), Zenoh-Flow expects a [`LinkMessage`](`LinkMessage`). It - /// is up to the user to encapsulate an instance of `T` in a [`LinkMessage`](`LinkMessage`). + /// # Example /// - /// If all the data must be encapsulated then calling `take::()` and using a typed - /// [`Output`](`Output`) will be more convenient and *as efficient*. - pub fn take_raw(&mut self, port_id: impl AsRef) -> Option { - self.hmap.remove(port_id.as_ref()).map(|senders| OutputRaw { - port_id: port_id.as_ref().into(), - senders, - hlc: Arc::clone(&self.hlc), - last_watermark: Arc::new(AtomicU64::new(self.hlc.new_timestamp().get_time().as_u64())), - }) + /// ```ignore + /// let output_typed: Output = outputs + /// .take("test") + /// .expect("No key named 'test' found") + /// .typed(|data: &u64| serde_json::to_vec(data).map_err(|e| anyhow::anyhow!(e))); + /// ``` + pub fn typed( + self, + serializer: impl Fn(&mut Vec, &T) -> anyhow::Result<()> + Send + Sync + 'static, + ) -> Output { + Output { + _phantom: PhantomData, + output_raw: self.raw(), + serializer: Arc::new(move |buffer, data| { + if let Some(typed) = (*data).as_any().downcast_ref::() { + match (serializer)(buffer, typed) { + Ok(serialized_data) => Ok(serialized_data), + Err(e) => bail!(ErrorKind::DeserializationError, e), + } + } else { + bail!( + ErrorKind::DeserializationError, + "Failed to downcast provided value" + ) + } + }), + } } } -/// An [`OutputRaw`](`OutputRaw`) sends [`LinkMessage`](`LinkMessage`) to downstream Nodes. +/// An [OutputRaw] sends [LinkMessage] or `Into<`[Payload]`>` to downstream Nodes. /// -/// It's primary purpose is to ensure "optimal" performance by performing no operation on the -/// received [`LinkMessage`](`LinkMessage`). This can be useful to implement behaviour where actual -/// access to the underlying data is irrelevant. +/// Its primary purpose is to ensure optimal performance: any message received on an input can +/// transparently be sent downstream, without requiring (a potentially expensive) access to the data +/// it contained. #[derive(Clone)] pub struct OutputRaw { pub(crate) port_id: PortId, @@ -137,7 +238,7 @@ impl OutputRaw { /// If a timestamp is provided, check that it is not inferior to the latest watermark. /// - /// If no timestamp is provided, a new one is generated from the HLC. + /// If no timestamp is provided, a new one is generated from the [HLC](uhlc::HLC). pub(crate) fn check_timestamp(&self, timestamp: Option) -> Result { let ts = match timestamp { Some(ts_u64) => Timestamp::new(uhlc::NTP64(ts_u64), *self.hlc.get_id()), @@ -153,13 +254,13 @@ impl OutputRaw { /// Attempt to forward, *synchronously*, the message to the downstream Nodes. /// - /// ## Asynchronous alternative: `forward` + /// # Asynchronous alternative: `forward` /// /// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: /// `forward`. Hence, although synchronous, this method will not block the thread on which it is /// executed. /// - /// ## Errors + /// # Errors /// /// If an error occurs while sending the message on a channel, Zenoh-Flow still tries to send it /// on the remaining channels. For each failing channel, an error is logged. @@ -192,38 +293,38 @@ impl OutputRaw { Ok(()) } - /// Attempt to send, synchronously, the `data` on all channels to the downstream Nodes. + /// Attempt to send, *synchronously*, the `data` on all channels to the downstream Nodes. /// - /// If no `timestamp` is provided, the current timestamp — as per the [`HLC`](`HLC`) used by the - /// Zenoh-Flow daemon running this Node — is taken. + /// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by + /// the Zenoh-Flow daemon running this Node) is taken. /// - /// ## Asynchronous alternative: `send` + /// # Asynchronous alternative: `send` /// /// This method is a synchronous fail-fast alternative to its asynchronous counterpart: `send`. /// Hence, although synchronous, this method will not block the thread on which it is executed. /// - /// ## Errors + /// # Errors /// /// If an error occurs while sending the watermark on a channel, Zenoh-Flow still tries to send /// it on the remaining channels. For each failing channel, an error is logged and counted for. pub fn try_send(&self, data: impl Into, timestamp: Option) -> Result<()> { let ts = self.check_timestamp(timestamp)?; - let message = LinkMessage::from_serdedata(data.into(), ts); + let message = LinkMessage::from_payload(data.into(), ts); self.try_forward(message) } /// Attempt to send, *synchronously*, the watermark on all channels to the downstream Nodes. /// - /// If no `timestamp` is provided, the current timestamp — as per the [`HLC`](`HLC`) used by the - /// Zenoh-Flow daemon running this Node — is taken. + /// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by + /// the Zenoh-Flow daemon running this Node) is taken. /// - /// ## Asynchronous alternative: `send_watermark` + /// # Asynchronous alternative: `send_watermark` /// /// This method is a synchronous fail-fast alternative to it's asynchronous counterpart: `send`. /// Although synchronous, this method will not block the thread on which it is executed. /// - /// ## Errors + /// # Errors /// /// If an error occurs while sending the watermark on a channel, Zenoh-Flow still tries to send /// it on the remaining channels. For each failing channel, an error is logged and counted for. @@ -235,10 +336,9 @@ impl OutputRaw { self.try_forward(message) } - /// Forward, *asynchronously*, the [`LinkMessage`](`LinkMessage`) on all channels to the - /// downstream Nodes. + /// Forward, *asynchronously*, the [LinkMessage] on all channels to the downstream Nodes. /// - /// ## Errors + /// # Errors /// /// If an error occurs while sending the message on a channel, Zenoh-Flow still tries to send /// it on the remaining channels. For each failing channel, an error is logged and counted for. @@ -278,32 +378,32 @@ impl OutputRaw { /// Send, *asynchronously*, the `data` on all channels to the downstream Nodes. /// - /// If no `timestamp` is provided, the current timestamp — as per the [`HLC`](`HLC`) used by the - /// Zenoh-Flow daemon running this Node — is taken. + /// If no `timestamp` is provided, the current timestamp — as per the [HLC](uhlc::HLC) used by + /// the Zenoh-Flow daemon running this Node — is taken. /// - /// ## Errors + /// # Errors /// /// If an error occurs while sending the watermark on a channel, Zenoh-Flow still tries to send /// it on the remaining channels. For each failing channel, an error is logged and counted for. pub async fn send(&self, data: impl Into, timestamp: Option) -> Result<()> { let ts = self.check_timestamp(timestamp)?; - let message = LinkMessage::from_serdedata(data.into(), ts); + let message = LinkMessage::from_payload(data.into(), ts); self.forward(message).await } - /// Send, *asynchronously*, a [`Watermark`](`LinkMessage::Watermark`) on all channels. + /// Send, *asynchronously*, a [Watermark](LinkMessage::Watermark) on all channels. /// - /// If no `timestamp` is provided, the current timestamp — as per the [`HLC`](`HLC`) used by the - /// Zenoh-Flow daemon running this Node — is taken. + /// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by + /// the Zenoh-Flow daemon running this Node) is taken. /// - /// ## Watermarks + /// # Watermarks /// - /// A [`Watermark`](`LinkMessage::Watermark`) is a special kind of message whose purpose is to - /// signal and guarantee the fact that no message with a lower [`Timestamp`](`Timestamp`) will - /// be send afterwards. + /// A [Watermark](LinkMessage::Watermark) is a special kind of message whose purpose is to + /// signal and guarantee the fact that no message with a lower [Timestamp] will be send + /// afterwards. /// - /// ## Errors + /// # Errors /// /// If an error occurs while sending the watermark on a channel, Zenoh-Flow still tries to send /// it on the remaining channels. For each failing channel, an error is logged and counted for. @@ -316,7 +416,7 @@ impl OutputRaw { } } -/// An [`Output`](`Output`) sends instances of `T` to downstream Nodes. +/// An [`Output`] sends instances of `T` to downstream Nodes. /// /// It's primary purpose is to ensure type guarantees: only types that implement `Into` can be /// sent to downstream Nodes. @@ -324,6 +424,7 @@ impl OutputRaw { pub struct Output { _phantom: PhantomData, pub(crate) output_raw: OutputRaw, + pub(crate) serializer: Arc, } // Dereferencing to the [`OutputRaw`](`OutputRaw`) allows to directly call methods on it with a @@ -336,46 +437,60 @@ impl Deref for Output { } } -impl Output { +impl Output { + // Construct the `LinkMessage` to send. + fn construct_message( + &self, + data: impl Into>, + timestamp: Option, + ) -> Result { + let ts = self.check_timestamp(timestamp)?; + let payload = Payload::from_data(data.into(), Arc::clone(&self.serializer)); + Ok(LinkMessage::from_payload(payload, ts)) + } + /// Send, *asynchronously*, the provided `data` to all downstream Nodes. /// - /// If no `timestamp` is provided, the current timestamp — as per the [`HLC`](`HLC`) used by the - /// Zenoh-Flow daemon running this Node — is taken. + /// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by + /// the Zenoh-Flow daemon running this Node) is taken. /// - /// ## Constraint `Into>` + /// # Constraint `Into>` /// /// Both `T` and `Data` implement this constraint. Hence, in practice, any type that /// implements `Into` can be sent (provided that `Into::::into(u)` is called first). /// - /// ## Errors + /// # Errors /// /// If an error occurs while sending the message on a channel, Zenoh-Flow still tries to send it /// on the remaining channels. For each failing channel, an error is logged and counted for. The /// total number of encountered errors is returned. pub async fn send(&self, data: impl Into>, timestamp: Option) -> Result<()> { - let ts = self.check_timestamp(timestamp)?; - let message = LinkMessage::from_serdedata(Into::::into(data.into()), ts); - self.output_raw.forward(message).await + self.output_raw + .forward(self.construct_message(data, timestamp)?) + .await } /// Tries to send the provided `data` to all downstream Nodes. /// - /// If no `timestamp` is provided, the current timestamp — as per the [`HLC`](`HLC`) used by the - /// Zenoh-Flow daemon running this Node — is taken. + /// If no `timestamp` is provided, the current timestamp (as per the [HLC](uhlc::HLC) used by + /// the Zenoh-Flow daemon running this Node) is taken. /// - /// ## Constraint `Into>` + /// # Constraint `Into>` /// /// Both `T` and `Data` implement this constraint. Hence, in practice, any type that /// implements `Into` can be sent (provided that `Into::::into(u)` is called first). /// - /// ## Errors + /// # Errors /// /// If an error occurs while sending the message on a channel, Zenoh-Flow still tries to send it /// on the remaining channels. For each failing channel, an error is logged and counted for. The /// total number of encountered errors is returned. pub fn try_send(&self, data: impl Into>, timestamp: Option) -> Result<()> { - let ts = self.check_timestamp(timestamp)?; - let message = LinkMessage::from_serdedata(Into::::into(data.into()), ts); - self.output_raw.try_forward(message) + self.output_raw + .try_forward(self.construct_message(data, timestamp)?) } } + +#[cfg(test)] +#[path = "./tests/output-tests.rs"] +mod tests; diff --git a/zenoh-flow/src/io/tests/input-tests.rs b/zenoh-flow/src/io/tests/input-tests.rs new file mode 100644 index 00000000..c8fcd2f5 --- /dev/null +++ b/zenoh-flow/src/io/tests/input-tests.rs @@ -0,0 +1,146 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use prost::Message as pMessage; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use types::Message; + +use super::{Input, InputRaw}; +use crate::{ + traits::SendSyncAny, + types::{self, LinkMessage, Payload}, +}; + +/// Test that the Input behaves as expected for the provided data and deserializer: +/// 1. when a Payload::Bytes is received the deserializer is called and produces the correct output, +/// 2. when a Payload::Typed is received the data can correctly be downcasted. +/// +/// ## Scenario tested +/// +/// A typed input is created. +/// +/// We send on the associated channel: +/// 1. a Payload::Bytes (the `expected_serialized`), +/// 2. a Payload::Typed (the `expected_data` upcasted to `dyn SendSyncAny`). +/// +/// ## Traits bound on T +/// +/// The bounds on `T` are more restrictive than what they are in the code. In particular, `Clone` +/// and `std::fmt::Debug` are not required. This has no impact on the test and mostly help us debug. +fn test_typed_input( + expected_data: T, + expected_serialized: Vec, + deserializer: impl Fn(&[u8]) -> anyhow::Result + Send + Sync + 'static, +) { + let hlc = uhlc::HLC::default(); + let (tx, rx) = flume::unbounded::(); + + let input_raw = InputRaw { + port_id: "test-id".into(), + receivers: vec![rx], + }; + + let input = Input { + input_raw, + deserializer: Arc::new(deserializer), + }; + + let message = LinkMessage::from_payload( + Payload::Bytes(Arc::new(expected_serialized)), + hlc.new_timestamp(), + ); + tx.send(message).expect("Failed to send message"); + + let (data, _) = input.try_recv().expect("Message (serialized) was not sent"); + if let Message::Data(data) = data { + assert_eq!(expected_data, *data); + } + + let message = LinkMessage::from_payload( + Payload::Typed(( + Arc::new(expected_data.clone()) as Arc, + // The serializer should never be called, hence the panic. + Arc::new(|_buffer, _data| panic!("Unexpected call to serialize the data")), + )), + hlc.new_timestamp(), + ); + tx.send(message).expect("Failed to send message"); + + let (data, _) = input + .try_recv() + .expect("Message (dyn SendSyncAny) was not sent"); + if let Message::Data(data) = data { + assert_eq!(expected_data, *data); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +/// SERDE JSON + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +struct TestData { + pub field1: u8, + pub field2: String, + pub field3: f64, +} + +#[test] +fn test_serde_json() { + let expected_data = TestData { + field1: 1u8, + field2: "test".to_string(), + field3: 0.2f64, + }; + + let expected_serialized = + serde_json::ser::to_vec(&expected_data).expect("serde_json failed to serialize"); + + test_typed_input(expected_data, expected_serialized, |bytes| { + serde_json::de::from_slice::(bytes).map_err(|e| anyhow::anyhow!(e)) + }) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +/// PROTOBUF PROST + +// This structure was generated using the `prost-build` crate. We copied & pasted it here such that +// we do not have to include `prost-build` as a build dependency to Zenoh-Flow. Our only purpose is +// to ensure that at least one implementation of ProtoBuf works, not to suggest to use Prost. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TestProto { + #[prost(int64, tag = "1")] + pub field1: i64, + #[prost(string, tag = "2")] + pub field2: ::prost::alloc::string::String, + #[prost(double, tag = "3")] + pub field3: f64, +} + +#[test] +fn test_protobuf_prost() { + let expected_data = TestProto { + field1: 1i64, + field2: "test".to_string(), + field3: 0.2f64, + }; + + // First test, send data serialized. + let expected_serialized = expected_data.encode_to_vec(); + + test_typed_input(expected_data, expected_serialized, |bytes| { + ::decode(bytes).map_err(|e| anyhow::anyhow!(e)) + }) +} diff --git a/zenoh-flow/src/io/tests/output-tests.rs b/zenoh-flow/src/io/tests/output-tests.rs new file mode 100644 index 00000000..d3a622e4 --- /dev/null +++ b/zenoh-flow/src/io/tests/output-tests.rs @@ -0,0 +1,149 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use prost::Message; +use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, sync::Arc}; + +use super::Outputs; +use crate::types::{LinkMessage, Payload}; + +/// Test that the Output behaves as expected for the provided data and serializer: +/// 1. the `serializer` is correctly type-erased yet still produces the correct output, +/// 2. the `expected_data` is not eagerly serialized and can correctly be downcasted. +/// +/// ## Scenario tested +/// +/// A bogus output is generated — see the call to `outputs.take`. We go through the `Outputs` +/// structure such that the transformation on the serializer is performed (i.e. the type is erased). +/// +/// The provided `expected_data` is sent on the output. +/// +/// A receiver channel ensures that: +/// 1. it is a `Payload::Typed`, +/// 2. we can still downcast it to `T`, +/// 3. the result of the serialization is correct. +/// +/// ## Traits on T +/// +/// The bounds on `T` are more restrictive than what they are in the code. In particular, `Clone` +/// and `std::fmt::Debug` are not required. This has no impact on the test and mostly help us debug. +fn test_typed_output( + expected_data: T, + expected_serialized: Vec, + serializer: impl for<'b, 'a> Fn(&'b mut Vec, &'a T) -> anyhow::Result<()> + + Send + + Sync + + 'static, +) { + let hlc = uhlc::HLC::default(); + let key: Arc = "test".into(); + + let (tx, rx) = flume::unbounded::(); + + let mut outputs = Outputs { + hmap: HashMap::from([(key.clone(), vec![tx])]), + hlc: Arc::new(hlc), + }; + + let output = outputs + .take(&key) + .expect("Wrong key provided") + .typed(serializer); + + output + .try_send(expected_data.clone(), None) + .expect("Failed to send the message"); + + let message = rx.recv().expect("Received no message"); + match message { + LinkMessage::Data(data) => match &*data { + Payload::Bytes(_) => panic!("Unexpected bytes payload"), + Payload::Typed((dyn_data, serializer)) => { + let mut dyn_serialized = Vec::new(); + (serializer)(&mut dyn_serialized, dyn_data.clone()).expect("Failed to serialize"); + assert_eq!(expected_serialized, dyn_serialized); + + let data = (**dyn_data) + .as_any() + .downcast_ref::() + .expect("Failed to downcast"); + assert_eq!(expected_data, *data); + } + }, + LinkMessage::Watermark(_) => panic!("Unexpected watermark message"), + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +/// SERDE JSON + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +struct TestData { + pub field1: u8, + pub field2: String, + pub field3: f64, +} + +#[test] +fn test_serde_json() { + let expected_data = TestData { + field1: 1u8, + field2: "two".into(), + field3: 0.3f64, + }; + + let expected_serialized = + serde_json::ser::to_vec(&expected_data).expect("serde_json failed to serialize"); + + let serializer = |buffer: &mut Vec, data: &TestData| { + serde_json::ser::to_writer(buffer, data).map_err(|e| anyhow::anyhow!(e)) + }; + + test_typed_output(expected_data, expected_serialized, serializer) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +/// PROTOBUF PROST + +// This structure was generated using the `prost-build` crate. We copied & pasted it here such that +// we do not have to include `prost-build` as a build dependency to Zenoh-Flow. Our only purpose is +// to ensure that at least one implementation of ProtoBuf works, not to suggest to use Prost. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TestProto { + #[prost(int64, tag = "1")] + pub field1: i64, + #[prost(string, tag = "2")] + pub field2: ::prost::alloc::string::String, + #[prost(double, tag = "3")] + pub field3: f64, +} + +#[test] +fn test_protobuf_prost() { + let expected_data = TestProto { + field1: 1i64, + field2: "two".into(), + field3: 0.3f64, + }; + + let expected_serialized = expected_data.encode_to_vec(); + + let serializer = |buffer: &mut Vec, data: &TestProto| { + data.encode(buffer).map_err(|e| anyhow::anyhow!(e)) + }; + + test_typed_output(expected_data, expected_serialized, serializer) +} diff --git a/zenoh-flow/src/io/tests/test_types.proto b/zenoh-flow/src/io/tests/test_types.proto new file mode 100644 index 00000000..51859a75 --- /dev/null +++ b/zenoh-flow/src/io/tests/test_types.proto @@ -0,0 +1,23 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +syntax = "proto3"; + +package testtypes.test_types; + +message TestProto { + int64 field1 = 1; + string field2 = 2; + double field3 = 3; +} diff --git a/zenoh-flow/src/lib.rs b/zenoh-flow/src/lib.rs index d6006042..6cb0dd82 100644 --- a/zenoh-flow/src/lib.rs +++ b/zenoh-flow/src/lib.rs @@ -34,7 +34,6 @@ //! Zenoh Flow provides several working examples that illustrate how to //! define operators, sources and sinks as well as how to //! declaratively define they data flow graph by means of a YAML file. - use const_format::formatcp; pub use ::zenoh_flow_derive; @@ -44,7 +43,6 @@ pub mod model; pub mod runtime; pub mod traits; pub mod types; -pub mod zfdata; pub mod utils; pub mod zfresult; @@ -54,11 +52,11 @@ pub use zfresult::{DaemonResult, ZFResult as Result}; pub mod prelude { pub use crate::io::{Input, InputRaw, Inputs, Output, OutputRaw, Outputs}; - pub use crate::traits::{DowncastAny, Node, Operator, Sink, Source, ZFData}; + pub use crate::traits::{Node, Operator, SendSyncAny, Sink, Source}; pub use crate::types::{ - Configuration, Context, Data, DataMessage, Message, NodeId, Payload, PortId, RuntimeId, + Configuration, Context, Data, DataMessage, Message, NodeId, PortId, RuntimeId, }; - pub use crate::zenoh_flow_derive::{export_operator, export_sink, export_source, ZFData}; + pub use crate::zenoh_flow_derive::{export_operator, export_sink, export_source}; pub use crate::zferror; pub use crate::zfresult::{Error, ErrorKind, ZFResult as Result}; } diff --git a/zenoh-flow/src/runtime/dataflow/instance/builtin/zenoh.rs b/zenoh-flow/src/runtime/dataflow/instance/builtin/zenoh.rs index 11a6091f..904520a4 100644 --- a/zenoh-flow/src/runtime/dataflow/instance/builtin/zenoh.rs +++ b/zenoh-flow/src/runtime/dataflow/instance/builtin/zenoh.rs @@ -166,10 +166,13 @@ impl<'a> Source for ZenohSource<'a> { })? .to_string(); - let output = outputs.take_raw(id).ok_or(zferror!( - ErrorKind::MissingOutput(id.clone()), - "Unable to find output: {id}" - ))?; + let output = outputs + .take(id) + .ok_or(zferror!( + ErrorKind::MissingOutput(id.clone()), + "Unable to find output: {id}" + ))? + .raw(); let subscriber = context .zenoh_session() .declare_subscriber(&ke) @@ -266,12 +269,24 @@ pub(crate) struct ZenohSink<'a> { _session: Arc, inputs: HashMap, publishers: HashMap>, - futs: Arc>>, - shm: Option>>, + state: Arc>, shm_element_size: usize, shm_backoff: u64, } +/// The ZenohSinkState stores in a single structure all the fields protected by a lock. +/// +/// The fields are: +/// - `futs` contains the pending futures waiting for inputs on their channel; +/// - `shm` holds the [SharedMemoryManager] used to send data through Zenoh's shared memory; +/// - `buffer` holds a growable vector of bytes in which the result of the serialization of the data +/// is stored. +pub(crate) struct ZenohSinkState { + pub(crate) futs: Vec, + pub(crate) shm: Option, + pub(crate) buffer: Vec, +} + /// Private function to retrieve the "Constructor" for the ZenohSink pub(crate) fn get_zenoh_sink_declaration() -> NodeDeclaration { NodeDeclaration:: { @@ -371,14 +386,12 @@ impl<'a> Sink for ZenohSink<'a> { let shm_size = shm_element_size * shm_elem_count; - shm_manager = Some(Arc::new(Mutex::new( - SharedMemoryManager::make(id, shm_size).map_err(|_| { - zferror!( - ErrorKind::ConfigurationError, - "Unable to allocate {shm_size} bytes of shared memory" - ) - })?, - ))); + shm_manager = Some(SharedMemoryManager::make(id, shm_size).map_err(|_| { + zferror!( + ErrorKind::ConfigurationError, + "Unable to allocate {shm_size} bytes of shared memory" + ) + })?); } let keyexpressions = configuration.get(KEY_KEYEXPRESSIONS).ok_or_else(|| { @@ -408,10 +421,13 @@ impl<'a> Sink for ZenohSink<'a> { })? .to_string(); - let input = inputs.take_raw(id).ok_or(zferror!( - ErrorKind::MissingInput(id.clone()), - "Unable to find input: {id}" - ))?; + let input = inputs + .take(id) + .ok_or(zferror!( + ErrorKind::MissingInput(id.clone()), + "Unable to find input: {id}" + ))? + .raw(); let subscriber = context.zenoh_session().declare_publisher(ke).res().await?; publishers.insert(id.clone().into(), subscriber); @@ -427,8 +443,11 @@ impl<'a> Sink for ZenohSink<'a> { _session: context.zenoh_session(), inputs: sink_inputs, publishers, - futs: Arc::new(Mutex::new(futs)), - shm: shm_manager, + state: Arc::new(Mutex::new(ZenohSinkState { + futs, + shm: shm_manager, + buffer: Vec::new(), + })), shm_element_size, shm_backoff, }) @@ -448,25 +467,24 @@ impl<'a> Node for ZenohSink<'a> { async fn iteration(&self) -> ZFResult<()> { // Getting the list of futures to poll in a temporary variable (that `select_all` can take // ownership of) - let mut futs = self.futs.lock().await; - let tmp = mem::take(&mut (*futs)); + let mut state = self.state.lock().await; + let tmp = mem::take(&mut state.futs); let ((id, result), _index, mut remaining) = select_all(tmp).await; match result { - Ok(LinkMessage::Data(mut dm)) => { + Ok(LinkMessage::Data(dm)) => { // Getting serialized data - let data = dm.get_inner_data().try_as_bytes()?; + dm.try_as_bytes_into(&mut state.buffer)?; // Getting publisher let publisher = self.publishers.get(&id).ok_or_else(|| { zferror!(ErrorKind::SendError, "Unable to find Publisher for {id}") })?; - match &self.shm { - Some(shm) => { + match state.shm { + Some(ref mut shm) => { // Getting shared memory manager - let mut shm = shm.lock().await; let mut buff = match shm.alloc(self.shm_element_size) { Ok(buf) => buf, Err(_) => { @@ -495,7 +513,7 @@ impl<'a> Node for ZenohSink<'a> { // Getting the underlying slice in the shared memory let slice = unsafe { buff.as_mut_slice() }; - let data_len = data.len(); + let data_len = state.buffer.len(); // If the shared memory block is big enough we send the data through it, // otherwise we go through the network. @@ -504,14 +522,14 @@ impl<'a> Node for ZenohSink<'a> { // We are coping memory here! // we should be able to serialize directly in the shared // memory buffer. - slice[0..data_len].copy_from_slice(&data); + slice[0..data_len].copy_from_slice(&state.buffer); publisher.put(buff).res().await?; } else { log::warn!("[ZenohSink] Sending data via network as we are unable to send it over shared memory, the serialized size is {} while shared memory is {}", data_len, self.shm_element_size); - publisher.put(&**data).res().await?; + publisher.put(state.buffer.as_slice()).res().await?; } } - None => publisher.put(&**data).res().await?, + None => publisher.put(state.buffer.as_slice()).res().await?, }; } Ok(_) => (), // Not the right message, ignore it. @@ -528,7 +546,7 @@ impl<'a> Node for ZenohSink<'a> { remaining.push(wait_flow_input(id, input)); // Set back the complete list for the next iteration - *futs = remaining; + state.futs = remaining; Ok(()) } diff --git a/zenoh-flow/src/runtime/dataflow/instance/runners/connector.rs b/zenoh-flow/src/runtime/dataflow/instance/runners/connector.rs index 985f0c96..f6504dd8 100644 --- a/zenoh-flow/src/runtime/dataflow/instance/runners/connector.rs +++ b/zenoh-flow/src/runtime/dataflow/instance/runners/connector.rs @@ -38,11 +38,25 @@ pub(crate) struct ZenohSender { pub(crate) input_raw: InputRaw, pub(crate) z_session: Arc, pub(crate) key_expr: KeyExpr<'static>, - pub(crate) shm: Option>>, + pub(crate) state: Arc>, pub(crate) shm_element_size: usize, pub(crate) shm_backoff: u64, } +/// The `ZenohSenderState` stores in a single structure all the fields protected by a lock. +/// +/// The fields are: +/// - `shm` holds the [SharedMemoryManager] used to send data through Zenoh's shared memory; +/// - `message_buffer` holds a growable vector of bytes in which the result of the serialization of +/// the [LinkMessage] is stored. +/// - `payload_buffer` holds a growable vector of bytes in which the result of the serialization of +/// the [Payload] contained inside the [LinkMessage] is stored. +pub(crate) struct ZenohSenderState { + pub(crate) shm: Option, + pub(crate) message_buffer: Vec, + pub(crate) payload_buffer: Vec, +} + impl ZenohSender { /// Creates a new `ZenohSender`. /// @@ -91,14 +105,14 @@ impl ZenohSender { .shared_memory_backoff .unwrap_or(ctx.runtime.shared_memory_backoff); - shm_manager = Some(Arc::new(Mutex::new( + shm_manager = Some( SharedMemoryManager::make(record.resource.clone(), shm_size).map_err(|_| { zferror!( ErrorKind::ConfigurationError, "Unable to allocate {shm_size} bytes of shared memory" ) })?, - ))); + ); shm_element_size = record .shared_memory_element_size @@ -113,9 +127,13 @@ impl ZenohSender { }, z_session: ctx.runtime.session.clone(), key_expr, - shm: shm_manager, shm_element_size, shm_backoff, + state: Arc::new(Mutex::new(ZenohSenderState { + shm: shm_manager, + message_buffer: Vec::default(), + payload_buffer: Vec::default(), + })), }) } } @@ -125,7 +143,7 @@ impl Node for ZenohSender { /// An iteration of a ZenohSender: wait for some data to publish, serialize it using `bincode` /// and publish it on Zenoh. /// - /// ## Errors + /// # Errors /// /// An error variant is returned if: /// - serialization fails @@ -133,83 +151,99 @@ impl Node for ZenohSender { /// - link recv fails async fn iteration(&self) -> ZFResult<()> { match self.input_raw.recv().await { - Ok(message) => match &self.shm { - Some(shm) => { - // Getting shared memory manager - let mut shm = shm.lock().await; - // Getting the shared memory buffer - let mut buff = match shm.alloc(self.shm_element_size) { - Ok(buf) => buf, - Err(_) => { - async_std::task::sleep(std::time::Duration::from_millis( - self.shm_backoff, - )) - .await; - log::trace!( - "[ZenohSender: {}] After failing allocation the GC collected: {} bytes -- retrying", - self.id, - shm.garbage_collect() - ); - log::trace!( - "[ZenohSender: {}] Trying to de-fragment memory... De-fragmented {} bytes", - self.id, - shm.defragment() - ); - shm.alloc(self.shm_element_size).map_err(|_| { - zferror!( - ErrorKind::ConfigurationError, - "Unable to allocated {} in the shared memory buffer!", - self.shm_element_size - ) - })? - } - }; + Ok(message) => { + let mut state = self.state.lock().await; - // Getting the underlying slice in the shared memory - let slice = unsafe { buff.as_mut_slice() }; + // NOTE: as per the documentation of Vec::default, which is what the + // `std::mem::take` will call, no allocation is performed until elements are pushed + // in the vector. + let mut message_buffer = std::mem::take(&mut state.message_buffer); + let mut payload_buffer = std::mem::take(&mut state.payload_buffer); - // WARNING ACHTUNG ATTENTION - // This may fail as the message could be bigger than - // the shared memory buffer that was allocated. - match message.serialize_bincode_into(slice) { - Ok(_) => { - // If the serialization is success then we send the - // shared memory buffer. - self.z_session - .put(self.key_expr.clone(), buff) - .congestion_control(CongestionControl::Block) - .res() - .await - } - Err(e) => { - // Otherwise we log a warn and we serialize on a normal - // Vec - let data = message.serialize_bincode()?; - log::warn!( - "[ZenohSender: {}] Unable to serialize into shared memory: {}, serialized size {}, shared memory size {}", - self.id, - e, - data.len(), - self.shm_element_size, - ); + match state.shm { + Some(ref mut shm) => { + // Getting the shared memory buffer + let mut buff = match shm.alloc(self.shm_element_size) { + Ok(buf) => buf, + Err(_) => { + async_std::task::sleep(std::time::Duration::from_millis( + self.shm_backoff, + )) + .await; + log::trace!( + "[ZenohSender: {}] After failing allocation the GC collected: {} bytes -- retrying", + self.id, + shm.garbage_collect() + ); + log::trace!( + "[ZenohSender: {}] Trying to de-fragment memory... De-fragmented {} bytes", + self.id, + shm.defragment() + ); + shm.alloc(self.shm_element_size).map_err(|_| { + zferror!( + ErrorKind::ConfigurationError, + "Unable to allocated {} in the shared memory buffer!", + self.shm_element_size + ) + })? + } + }; + + // Getting the underlying slice in the shared memory + let slice = unsafe { buff.as_mut_slice() }; - self.z_session - .put(self.key_expr.clone(), data) - .congestion_control(CongestionControl::Block) - .res() - .await + // WARNING ACHTUNG ATTENTION + // This may fail as the message could be bigger than + // the shared memory buffer that was allocated. + match message.serialize_bincode_into_shm(slice, &mut payload_buffer) { + Ok(_) => { + // If the serialization succeeded then we send the shared memory + // buffer. + self.z_session + .put(self.key_expr.clone(), buff) + .congestion_control(CongestionControl::Block) + .res() + .await?; + } + Err(e) => { + // Otherwise we log a warn and we serialize on a normal + // Vec + message.serialize_bincode_into( + &mut message_buffer, + &mut payload_buffer, + )?; + log::warn!( + "[ZenohSender: {}] Unable to serialize into shared memory: {}, serialized size {}, shared memory size {}", + self.id, + e, + state.message_buffer.len(), + self.shm_element_size, + ); + + self.z_session + .put(self.key_expr.clone(), message_buffer.clone()) + .congestion_control(CongestionControl::Block) + .res() + .await?; + } } } + None => { + message.serialize_bincode_into(&mut message_buffer, &mut payload_buffer)?; + self.z_session + .put(self.key_expr.clone(), message_buffer.clone()) + .congestion_control(CongestionControl::Block) + .res() + .await?; + } } - None => { - let data = message.serialize_bincode()?; - self.z_session - .put(self.key_expr.clone(), data) - .congestion_control(CongestionControl::Block) - .res() - .await - } - }, + + // NOTE: set back the buffers such that we don't have to allocate memory again. + state.message_buffer = message_buffer; + state.payload_buffer = payload_buffer; + Ok(()) + } Err(e) => Err(zferror!( ErrorKind::Disconnected, "[ZenohSender: {}] {:?}", diff --git a/zenoh-flow/src/traits.rs b/zenoh-flow/src/traits.rs index c9f5298a..6f9490dd 100644 --- a/zenoh-flow/src/traits.rs +++ b/zenoh-flow/src/traits.rs @@ -15,73 +15,30 @@ use crate::prelude::{Inputs, Outputs}; use crate::types::{Configuration, Context}; use crate::Result; + use async_trait::async_trait; use std::any::Any; -use std::fmt::Debug; -/// This trait is used to ensure the data can donwcast to [`Any`](`Any`) -/// NOTE: This trait is separate from `ZFData` so that we can provide -/// a `#derive` macro to automatically implement it for the users. -/// -/// This can be derived using the `#[derive(ZFData)]` -/// -/// ## Example -/// -/// ```no_run -/// use zenoh_flow::prelude::*; +/// The `SendSyncAny` trait allows Zenoh-Flow to send data between nodes running in the same process +/// without serializing. /// -/// #[derive(Debug, Clone, ZFData)] -/// pub struct MyString(pub String); -/// ``` -pub trait DowncastAny { - /// Donwcast as a reference to [`Any`](`Any`) +/// This trait is implemented for any type that has the `static` lifetime and implements `Send` and +/// `Sync`. These constraints are the same than for the typed `Input` and `Output` which means that +/// there is absolutely no need to manually implement it. +pub trait SendSyncAny: Send + Sync { fn as_any(&self) -> &dyn Any; - /// Donwcast as a mutable reference to [`Any`](`Any`) fn as_mut_any(&mut self) -> &mut dyn Any; } -/// This trait abstracts the user's data type inside Zenoh Flow. -/// -/// User types should implement this trait otherwise Zenoh Flow will -/// not be able to handle the data, serialize and deserialize them when needed. -/// -/// ## Example -/// -/// ```no_run -/// use zenoh_flow::prelude::*; -/// -/// #[derive(Debug, Clone, ZFData)] -/// pub struct MyString(pub String); -/// impl ZFData for MyString { -/// fn try_serialize(&self) -> Result> { -/// Ok(self.0.as_bytes().to_vec()) -/// } -/// -/// fn try_deserialize(bytes: &[u8]) -> Result -/// where -/// Self: Sized, -/// { -/// Ok(MyString( -/// String::from_utf8(bytes.to_vec()).map_err(|e| zferror!(ErrorKind::DeserializationError, e))?, -/// )) -/// } -/// } -/// ``` -pub trait ZFData: DowncastAny + Debug + Send + Sync { - /// Tries to serialize the data as `Vec` - /// - /// # Errors - /// If it fails to serialize an error variant will be returned. - fn try_serialize(&self) -> Result>; +impl SendSyncAny for T { + fn as_any(&self) -> &dyn Any { + self + } - /// Tries to deserialize from a slice of `u8`. - /// - /// # Errors - /// If it fails to deserialize an error variant will be returned. - fn try_deserialize(bytes: &[u8]) -> Result - where - Self: Sized; + fn as_mut_any(&mut self) -> &mut dyn Any { + self + } } /// The `Source` trait represents a Source of data in Zenoh Flow. Sources only possess `Outputs` and @@ -114,7 +71,11 @@ pub trait ZFData: DowncastAny + Debug + Send + Sync { /// _configuration: Option, /// mut outputs: Outputs, /// ) -> Result { -/// let output = outputs.take("out").expect("No output called 'out' found"); +/// let output = outputs +/// .take("out") +/// .expect("No output called 'out' found") +/// .typed(|buffer, data| todo!("Provide your serializer here")); +/// /// Ok(Self { output }) /// } /// } @@ -184,7 +145,10 @@ pub trait Source: Node + Send + Sync { /// _configuration: Option, /// mut inputs: Inputs, /// ) -> Result { -/// let input = inputs.take("in").expect("No input called 'in' found"); +/// let input = inputs +/// .take("in") +/// .expect("No input called 'in' found") +/// .typed(|bytes| todo!("Provide your deserializer here")); /// /// Ok(GenericSink { input }) /// } @@ -255,8 +219,14 @@ pub trait Sink: Node + Send + Sync { /// mut outputs: Outputs, /// ) -> Result { /// Ok(NoOp { -/// input: inputs.take("in").expect("No input called 'in' found"), -/// output: outputs.take("out").expect("No output called 'out' found"), +/// input: inputs +/// .take("in") +/// .expect("No input called 'in' found") +/// .typed(|bytes| todo!("Provide your deserializer here")), +/// output: outputs +/// .take("out") +/// .expect("No output called 'out' found") +/// .typed(|buffer, data| todo!("Provide your serializer here")), /// }) /// } /// } diff --git a/zenoh-flow/src/types/message.rs b/zenoh-flow/src/types/message.rs index 4e010526..29101225 100644 --- a/zenoh-flow/src/types/message.rs +++ b/zenoh-flow/src/types/message.rs @@ -12,131 +12,97 @@ // ZettaScale Zenoh Team, // -extern crate serde; - -use crate::traits::ZFData; +use crate::bail; +use crate::prelude::ErrorKind; +use crate::traits::SendSyncAny; use crate::types::{FlowId, NodeId, PortId}; -use crate::zfresult::ErrorKind; -use crate::Result; -use crate::{bail, zferror}; +use crate::{zferror, Result}; +use async_std::sync::Arc; use serde::{Deserialize, Serialize}; -use std::ops::{Deref, DerefMut}; -use std::sync::Arc; +use std::ops::Deref; use std::{cmp::Ordering, fmt::Debug}; use uhlc::Timestamp; use uuid::Uuid; -/// A `Payload` is Zenoh-Flow's lowest message container. +/// `SerializerFn` is a type-erased version of the serializer function provided by node developer. /// -/// It either contains serialized data (if received from the network, or from nodes not written in -/// Rust), or `Typed` data as [`ZFData`](`ZFData`). +/// It is passed to downstream nodes (residing on the same process) in case they need to serialize +/// the data they receive typed. +/// Passing around the function allows us to serialize only when needed and without requiring prior +/// knowledge. +pub(crate) type SerializerFn = + dyn Fn(&mut Vec, Arc) -> Result<()> + Send + Sync; + +/// This function is what Zenoh-Flow will use to deserialize the data received on the `Input`. /// -/// The `Typed` data is never serialized directly when sending over Zenoh or to an operator not -/// written in Rust. -#[derive(Clone, Debug, Serialize, Deserialize)] +/// It will be called for instance when data is received serialized (i.e. from an upstream node that +/// is either not implemented in Rust or on a different process) before it is given to the user's +/// code. +pub(crate) type DeserializerFn = dyn Fn(&[u8]) -> anyhow::Result + Send + Sync; + +/// A `Payload` is Zenoh-Flow's lowest message container. +/// +/// It either contains serialized data, i.e. `Bytes` (if received from the network, or from nodes +/// not written in Rust), or `Typed` data as a tuple `(`[Any](`std::any::Any`)`, SerializerFn)`. +#[derive(Clone, Serialize, Deserialize)] pub enum Payload { - /// Serialized data, coming either from Zenoh of from non-rust node. + /// Serialized data, coming either from Zenoh of from non-Rust node. Bytes(Arc>), #[serde(skip_serializing, skip_deserializing)] - /// Actual data as instance of 'ZFData` coming from a Rust node. - /// - /// This is never serialized directly. - Typed(Arc), + /// Data coming from another Rust node located on the same process that can either be downcasted + /// (provided that its actual type is known) or serialized. + Typed((Arc, Arc)), +} + +impl Debug for Payload { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Payload::Bytes(_) => write!(f, "Payload::Bytes"), + Payload::Typed(_) => write!(f, "Payload::Typed"), + } + } } impl Payload { - /// Tries to return a serialized representation of the data. - /// - /// It does not actually change the internal representation. The serialized representation in - /// stored inside an `Arc` to avoid copies. - /// - /// # Errors - /// - /// If it fails to serialize an error variant will be returned. - pub fn try_as_bytes(&self) -> Result>> { - match &self { - Self::Bytes(bytes) => Ok(bytes.clone()), - Self::Typed(typed) => { - let serialized_data = typed - .try_serialize() - .map_err(|e| zferror!(ErrorKind::SerializationError, "{:?}", e))?; - Ok(Arc::new(serialized_data)) + pub fn from_data( + data: Data, + serializer: Arc, + ) -> Self { + match data.inner { + DataInner::Payload { payload, data: _ } => payload, + DataInner::Data(data) => { + Self::Typed((Arc::new(data) as Arc, serializer)) } } } - /// Tries to cast the data to the given type. - /// - /// If the data is represented as serialized, this will try to deserialize the bytes and change - /// the internal representation of the data. - /// - /// If the data is already represented with as `Typed` then it will return an *immutable* - /// reference to the internal data. + /// Populate `buffer` with the bytes representation of the [Payload]. /// - /// This reference is *immutable* because one Output can send data to multiple Inputs, therefore - /// to avoid copies the same `Arc` is sent to multiple operators, thus it is multiple-owned and - /// the data inside cannot be modified. + /// # Performance /// - /// # Errors + /// This method will serialize the [Payload] if it is `Typed`. Otherwise, the bytes + /// representation is simply cloned. /// - /// If fails to cast an error variant will be returned. - pub fn try_get(&mut self) -> Result<&Typed> - where - Typed: ZFData + 'static, - { - *self = (match &self { - Self::Bytes(bytes) => { - let data: Arc = Arc::new( - Typed::try_deserialize(bytes.as_slice()) - .map_err(|e| zferror!(ErrorKind::DeserializationError, "{:?}", e))?, - ); - Ok(Self::Typed(data.clone())) - } - Self::Typed(typed) => Ok(Self::Typed(typed.clone())), - } as Result)?; + /// The provided `buffer` is reused and cleared between calls, so once its capacity stabilizes + /// no more allocation is performed. + pub(crate) fn try_as_bytes_into(&self, buffer: &mut Vec) -> Result<()> { + buffer.clear(); // remove previous data but keep the allocated capacity match self { - Self::Typed(typed) => Ok(typed - .as_any() - .downcast_ref::() - .ok_or_else(|| zferror!(ErrorKind::InvalidData, "Could not downcast"))?), - _ => Err(zferror!(ErrorKind::InvalidData, "Should be deserialized first").into()), + Payload::Bytes(bytes) => { + (**bytes).clone_into(buffer); + Ok(()) + } + Payload::Typed((typed_data, serializer)) => { + (serializer)(buffer, Arc::clone(typed_data)) + } } } } -/// Creates a Data from an `Arc` of typed data. -/// The typed data has to be an instance of `ZFData`. -impl From> for Payload -where - UT: ZFData + 'static, -{ - fn from(data: Arc) -> Self { - Self::Typed(data) - } -} - -/// Creates a Data from typed data. -/// The typed data has to be an instance of `ZFData`. -/// The data is then stored inside an `Arc` to avoid copies. -impl From for Payload -where - UT: ZFData + 'static, -{ - fn from(data: UT) -> Self { - Self::Typed(Arc::new(data)) - } -} - -/// Creates a new `Data` from a `Arc>`. -impl From>> for Payload { - fn from(bytes: Arc>) -> Self { - Self::Bytes(bytes) - } -} - -/// Creates a new `Data` from a `Vec`, +/// Creates a new `Data` from a `Vec`. +/// /// In order to avoid copies it puts the data inside an `Arc`. impl From> for Payload { fn from(bytes: Vec) -> Self { @@ -151,12 +117,6 @@ impl From<&[u8]> for Payload { } } -impl From> for Payload { - fn from(data: Data) -> Self { - data.payload - } -} - impl From for Payload { fn from(data_message: DataMessage) -> Self { data_message.data @@ -181,52 +141,13 @@ impl Deref for DataMessage { } } -impl DerefMut for DataMessage { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.data - } -} - impl DataMessage { - /// Creates a new [`DataMessage`](`DataMessage`) with given `Data`, - /// `Timestamp` and `Vec`. - pub fn new(data: Payload, timestamp: Timestamp) -> Self { - Self { data, timestamp } - } - - /// Returns a mutable reference over the Data representation (i.e. `Bytes` or `Typed`). - /// - /// This method should be called in conjonction with `try_get::()` in order to get a - /// reference of the desired type. For instance: - /// - /// `let zf_usise: &ZFUsize = data_message.get_inner_data().try_get::()?;` - /// - /// Note that the prerequisite for the above code to work is that `ZFUsize` implements the - /// traits: `ZFData` and `Deserializable`. - pub fn get_inner_data(&mut self) -> &mut Payload { - &mut self.data - } - - /// Returns a reference to the data `Timestamp`. - pub fn get_timestamp(&self) -> &Timestamp { - &self.timestamp - } - /// Creates a new message from serialized data. - /// This is used when the message is coming from Zenoh or from a non-rust - /// node. - pub fn new_serialized(data: Arc>, timestamp: Timestamp) -> Self { - Self { - data: Payload::Bytes(data), - timestamp, - } - } - - /// Creates a messages from `Typed` data. - /// This is used when the data is generated from rust nodes. - pub fn new_deserialized(data: Arc, timestamp: Timestamp) -> Self { + /// + /// This is used when the message is coming from Zenoh or from a non-rust node. + pub fn new_serialized(data: Vec, timestamp: Timestamp) -> Self { Self { - data: Payload::Typed(data), + data: Payload::Bytes(Arc::new(data)), timestamp, } } @@ -271,63 +192,93 @@ pub enum LinkMessage { impl LinkMessage { /// Creates a `LinkMessage::Data` from a [`Payload`](`Payload`). - pub fn from_serdedata(output: Payload, timestamp: Timestamp) -> Self { - match output { - Payload::Typed(data) => Self::Data(DataMessage::new_deserialized(data, timestamp)), - Payload::Bytes(data) => Self::Data(DataMessage::new_serialized(data, timestamp)), - } + pub fn from_payload(output: Payload, timestamp: Timestamp) -> Self { + Self::Data(DataMessage { + data: output, + timestamp, + }) } - /// Serializes the `Message` using bincode. + /// Serializes the [LinkMessage] using [bincode] into the given `buffer`. + /// + /// The `inner_buffer` is used to serialize (if need be) the [Payload] contained inside the + /// [LinkMessage]. + /// + /// # Performance + /// + /// The provided `buffer` and `inner_buffer` are reused and cleared between calls, so once their + /// capacity stabilizes no (re)allocation is performed. /// /// # Errors /// /// An error variant is returned in case of: /// - fails to serialize - pub fn serialize_bincode(&self) -> Result> { + pub fn serialize_bincode_into( + &self, + message_buffer: &mut Vec, + payload_buffer: &mut Vec, + ) -> Result<()> { + payload_buffer.clear(); // empty the buffers but keep their allocated capacity + message_buffer.clear(); + match &self { LinkMessage::Data(data_message) => match &data_message.data { - Payload::Bytes(_) => bincode::serialize(&self) + Payload::Bytes(_) => bincode::serialize_into(message_buffer, &self) .map_err(|e| zferror!(ErrorKind::SerializationError, e).into()), - Payload::Typed(_) => { - let serialized_data = data_message.data.try_as_bytes()?; - let serialized_message = LinkMessage::Data(DataMessage::new_serialized( - serialized_data, - data_message.timestamp, - )); - - bincode::serialize(&serialized_message) + Payload::Typed((data, serializer)) => { + (serializer)(payload_buffer, Arc::clone(data))?; + let serialized_message = LinkMessage::Data(DataMessage { + data: Payload::Bytes(Arc::new(payload_buffer.clone())), + timestamp: data_message.timestamp, + }); + + bincode::serialize_into(message_buffer, &serialized_message) .map_err(|e| zferror!(ErrorKind::SerializationError, e).into()) } }, - _ => bincode::serialize(&self) + _ => bincode::serialize_into(message_buffer, &self) .map_err(|e| zferror!(ErrorKind::SerializationError, e).into()), } } - /// Serializes the `Message` using bincode into the given slice. + /// Serializes the [LinkMessage] using [bincode] into the given `shm_buffer` shared memory + /// buffer. + /// + /// The `inner_buffer` is used to serialize (if need be) the [Payload] contained inside the + /// [LinkMessage]. + /// + /// # Performance + /// + /// The provided `inner_buffer` is reused and cleared between calls, so once its capacity + /// stabilizes no (re)allocation is performed. /// /// # Errors /// /// An error variant is returned in case of: /// - fails to serialize /// - there is not enough space in the slice - pub fn serialize_bincode_into(&self, buff: &mut [u8]) -> Result<()> { + pub fn serialize_bincode_into_shm( + &self, + shm_buffer: &mut [u8], + payload_buffer: &mut Vec, + ) -> Result<()> { + payload_buffer.clear(); // empty the buffer but keep the allocated capacity + match &self { LinkMessage::Data(data_message) => match &data_message.data { - Payload::Bytes(_) => bincode::serialize_into(buff, &self) + Payload::Bytes(_) => bincode::serialize_into(shm_buffer, &self) .map_err(|e| zferror!(ErrorKind::SerializationError, e).into()), Payload::Typed(_) => { - let serialized_data = data_message.data.try_as_bytes()?; + data_message.try_as_bytes_into(payload_buffer)?; let serialized_message = LinkMessage::Data(DataMessage::new_serialized( - serialized_data, + payload_buffer.clone(), data_message.timestamp, )); - bincode::serialize_into(buff, &serialized_message) + bincode::serialize_into(shm_buffer, &serialized_message) .map_err(|e| zferror!(ErrorKind::SerializationError, e).into()) } }, - _ => bincode::serialize_into(buff, &self) + _ => bincode::serialize_into(shm_buffer, &self) .map_err(|e| zferror!(ErrorKind::SerializationError, e).into()), } } @@ -371,7 +322,8 @@ impl Eq for LinkMessage {} /// `recv`. /// /// A `Message` can either contain [`Data`](`Data`), or signal a _Watermark_. -pub enum Message { +#[derive(Debug)] +pub enum Message { Data(Data), Watermark, } @@ -386,26 +338,69 @@ pub enum Message { /// When deserializing, an allocation is performed. #[derive(Debug)] pub struct Data { - payload: Payload, - bytes: Option>, - // CAVEAT: Setting the value **typed** to `None` when it was already set to `Some` can cause a - // panic! The visibility of the entire structure is private to force us not to make such - // mistake. - // In short, tread carefully! - typed: Option, + inner: DataInner, +} + +/// The `DataInner` enum represents the two ways to send data in an [`Output`](`Output`). +/// +/// The `Payload` variant corresponds to a previously generated `Data` being sent. +/// The `Data` variant corresponds to a new instance of `T` being sent. +pub(crate) enum DataInner { + Payload { payload: Payload, data: Option }, + Data(T), } -// CAVEAT: The implementation of `Deref` is what allows users to transparently manipulate the type -// `T`. However, for things to go well, the `typed` field MUST BE MANIPULATED WITH CAUTION. -impl Deref for Data { +impl Debug for DataInner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DataInner::Payload { payload, data } => { + let data = if data.is_some() { "Some" } else { "None" }; + write!(f, "DataInner::Payload: {:?} - data: {}", payload, data) + } + DataInner::Data(_) => write!(f, "DataInner::Data(T)"), + } + } +} + +// Implementing `From` allows us to accept instances of `T` in the signature of `send` and +// `try_send` methods as `T` will implement `impl Into>`. +impl From for Data { + fn from(value: T) -> Self { + Self { + inner: DataInner::Data(value), + } + } +} + +// The implementation of `Deref` is what allows users to transparently manipulate the type `T`. +// +// ## SAFETY +// +// Despite the presence of `expect` and `panic!`, we should never end up in these situations in +// normal circumstances. +// +// Let us reason here as to why this is "safe". +// +// The call to `expect` happens when the inner data is a [`Typed`](`Payload::Typed`) payload and the +// downcasts to `T` fails. This should not happen because of the way a [`Data`](`Data`) is created: +// upon creation we first perform a check that the provided typed payload can actually be downcasted +// to `T` — see the method `Data::try_from_payload`. +// +// The call to `panic!` happens when the inner data is a [`Bytes`](`Payload::Bytes`) payload and the +// `data` field is `None`. Again, this should not happen because of the way a [`Data`](`Data`) is +// created: upon creation, if the data is received as bytes, we first deserialize it and set the +// `data` field to `Some(T)` — see the method `Data::try_from_payload`. +impl Deref for Data { type Target = T; fn deref(&self) -> &Self::Target { - if let Some(ref typed) = self.typed { - typed - } else if let Payload::Typed(ref typed) = self.payload { - typed.as_any().downcast_ref::().expect( - r#"You probably managed to find a very nasty flaw in Zenoh-Flow’s code as we + match &self.inner { + DataInner::Payload { payload, data } => { + if let Some(data) = data { + data + } else if let Payload::Typed((typed, _)) = payload { + (**typed).as_any().downcast_ref::().expect( + r#"You probably managed to find a very nasty flaw in Zenoh-Flow’s code as we believed this situation would never happen (unless explicitely triggered — "explicitely" being an understatement here, we feel it’s more like you really, really, wanted to see that message — in which case, congratulations!). @@ -420,10 +415,10 @@ happened and we would be eager to investigate. Feel free to contact us at < zenoh@zettascale.tech >. "#, - ) - } else { - panic!( - r#"You probably managed to find a very nasty flaw in Zenoh-Flow's code as we + ) + } else { + panic!( + r#"You probably managed to find a very nasty flaw in Zenoh-Flow's code as we believed this situation would never happen (unless explicitely triggered — "explicitely" being an understatement here, we feel it's more like you really, really, wanted to see that message — in which case, congratulations!). @@ -437,12 +432,15 @@ happened and we would be eager to investigate. Feel free to contact us at < zenoh@zettascale.tech >. "# - ) + ) + } + } + DataInner::Data(data) => data, } } } -impl Data { +impl Data { /// Try to create a new [`Data`](`Data`) based on a [`Payload`](`Payload`). /// /// Depending on the variant of [`Payload`](`Payload`) different steps are performed: @@ -455,71 +453,29 @@ impl Data { /// /// An error will be returned if the Payload does not match `T`, i.e. if the deserialization or /// the downcast failed. - pub(crate) fn try_new(payload: Payload) -> Result { + pub(crate) fn try_from_payload( + payload: Payload, + deserializer: Arc>, + ) -> Result { let mut typed = None; match payload { - Payload::Bytes(ref bytes) => typed = Some(T::try_deserialize(bytes)?), - Payload::Typed(ref typed) => { - if !(*typed).as_any().is::() { + Payload::Bytes(ref bytes) => typed = Some((deserializer)(bytes.as_slice())?), + Payload::Typed((ref typed, _)) => { + if !(**typed).as_any().is::() { bail!( ErrorKind::DeserializationError, - "Could not downcast payload" + "Failed to downcast provided value", ) } } } Ok(Self { - payload, - bytes: None, - typed, + inner: DataInner::Payload { + payload, + data: typed, + }, }) } - - /// Try to obtain the bytes representation of `T`. - /// - /// Depending on how the [`Payload`](`Payload`) was received, this method either tries to - /// serialize it or simply returns a slice view. - /// - /// ## Errors - /// - /// This method can return an error if the serialization failed. - pub fn try_as_bytes(&mut self) -> Result<&[u8]> { - match &self.payload { - Payload::Bytes(bytes) => Ok(bytes.as_slice()), - Payload::Typed(typed) => { - if self.bytes.is_none() { - self.bytes = Some(typed.try_serialize()?); - } - Ok(self - .bytes - .as_ref() - .expect("This cannot fail as we serialized above")) - } - } - } -} - -// Implementing `From` allows us to accept instances of `T` in the signature of `send` and -// `try_send` methods as `T` will implement `impl Into>`. -impl From for Data { - fn from(data: T) -> Self { - let payload = Payload::Typed(Arc::new(data)); - Self { - payload, - bytes: None, - typed: None, - } - } -} - -impl From for Data { - fn from(value: DataMessage) -> Self { - Self { - payload: value.data, - bytes: None, - typed: None, - } - } } diff --git a/zenoh-flow/src/zfdata/mod.rs b/zenoh-flow/src/zfdata/mod.rs deleted file mode 100644 index 8efd5e12..00000000 --- a/zenoh-flow/src/zfdata/mod.rs +++ /dev/null @@ -1,503 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -//! -//! This module implements the [`ZFData`](`ZFData`) trait for some basic types: signed and unsigned -//! integers, floats, Strings, booleans. -//! -//! The implementation is (for now) rather naïve and thus not the most efficient. -//! -//! ## Caveat: `Vec` -//! -//! Given that data can be received serialized (i.e. in a `Vec` form), the `ZFData` trait should -//! not be implemented for `Vec` directly. Indeed, if we do, then when trying to downcast after -//! deserializing through `::try_deserialize` Rust will assume that the actual type of the -//! data is still `Vec` and not ``. -//! -//! ## Todo -//! -//! - Optimize with additional Reader and Writer traits so as to not require an allocation when -//! serializing (i.e. we provide a `&mut Writer` that Zenoh-Flow controls). -//! - Investigate supporting ProtoBuf and Cap'n'Proto. -//! - -use crate::prelude::{DowncastAny, ErrorKind, ZFData}; -use crate::{bail, zferror}; -use std::convert::TryInto; - -/* - * - * Unsigned integers - * - */ - -impl DowncastAny for usize { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } -} - -impl ZFData for usize { - fn try_serialize(&self) -> crate::Result> { - Ok(self.to_le_bytes().to_vec()) - } - - fn try_deserialize(bytes: &[u8]) -> crate::Result - where - Self: Sized, - { - let b = bytes - .try_into() - .map_err(|e| zferror!(ErrorKind::DeserializationError, e))?; - - Ok(usize::from_le_bytes(b)) - } -} - -impl DowncastAny for u128 { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } -} - -impl ZFData for u128 { - fn try_serialize(&self) -> crate::Result> { - Ok(self.to_le_bytes().to_vec()) - } - - fn try_deserialize(bytes: &[u8]) -> crate::Result - where - Self: Sized, - { - let b = bytes - .try_into() - .map_err(|e| zferror!(ErrorKind::DeserializationError, e))?; - - Ok(u128::from_le_bytes(b)) - } -} - -impl DowncastAny for u64 { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } -} - -impl ZFData for u64 { - fn try_serialize(&self) -> crate::Result> { - Ok(self.to_le_bytes().to_vec()) - } - - fn try_deserialize(bytes: &[u8]) -> crate::Result - where - Self: Sized, - { - let b = bytes - .try_into() - .map_err(|e| zferror!(ErrorKind::DeserializationError, e))?; - - Ok(u64::from_le_bytes(b)) - } -} - -impl DowncastAny for u32 { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } -} - -impl ZFData for u32 { - fn try_serialize(&self) -> crate::Result> { - Ok(self.to_le_bytes().to_vec()) - } - - fn try_deserialize(bytes: &[u8]) -> crate::Result - where - Self: Sized, - { - let b = bytes - .try_into() - .map_err(|e| zferror!(ErrorKind::DeserializationError, e))?; - - Ok(u32::from_le_bytes(b)) - } -} - -impl DowncastAny for u16 { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } -} - -impl ZFData for u16 { - fn try_serialize(&self) -> crate::Result> { - Ok(self.to_le_bytes().to_vec()) - } - - fn try_deserialize(bytes: &[u8]) -> crate::Result - where - Self: Sized, - { - let b = bytes - .try_into() - .map_err(|e| zferror!(ErrorKind::DeserializationError, e))?; - - Ok(u16::from_le_bytes(b)) - } -} - -impl DowncastAny for u8 { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } -} - -impl ZFData for u8 { - fn try_serialize(&self) -> crate::Result> { - Ok(self.to_le_bytes().to_vec()) - } - - fn try_deserialize(bytes: &[u8]) -> crate::Result - where - Self: Sized, - { - let b = bytes - .try_into() - .map_err(|e| zferror!(ErrorKind::DeserializationError, e))?; - - Ok(u8::from_le_bytes(b)) - } -} - -/* - * - * Signed integers - * - */ - -impl DowncastAny for isize { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } -} - -impl ZFData for isize { - fn try_serialize(&self) -> crate::Result> { - Ok(self.to_le_bytes().to_vec()) - } - - fn try_deserialize(bytes: &[u8]) -> crate::Result - where - Self: Sized, - { - let b = bytes - .try_into() - .map_err(|e| zferror!(ErrorKind::DeserializationError, e))?; - - Ok(isize::from_le_bytes(b)) - } -} - -impl DowncastAny for i128 { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } -} - -impl ZFData for i128 { - fn try_serialize(&self) -> crate::Result> { - Ok(self.to_le_bytes().to_vec()) - } - - fn try_deserialize(bytes: &[u8]) -> crate::Result - where - Self: Sized, - { - let b = bytes - .try_into() - .map_err(|e| zferror!(ErrorKind::DeserializationError, e))?; - - Ok(i128::from_le_bytes(b)) - } -} - -impl DowncastAny for i64 { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } -} - -impl ZFData for i64 { - fn try_serialize(&self) -> crate::Result> { - Ok(self.to_le_bytes().to_vec()) - } - - fn try_deserialize(bytes: &[u8]) -> crate::Result - where - Self: Sized, - { - let b = bytes - .try_into() - .map_err(|e| zferror!(ErrorKind::DeserializationError, e))?; - - Ok(i64::from_le_bytes(b)) - } -} - -impl DowncastAny for i32 { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } -} - -impl ZFData for i32 { - fn try_serialize(&self) -> crate::Result> { - Ok(self.to_le_bytes().to_vec()) - } - - fn try_deserialize(bytes: &[u8]) -> crate::Result - where - Self: Sized, - { - let b = bytes - .try_into() - .map_err(|e| zferror!(ErrorKind::DeserializationError, e))?; - - Ok(i32::from_le_bytes(b)) - } -} - -impl DowncastAny for i16 { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } -} - -impl ZFData for i16 { - fn try_serialize(&self) -> crate::Result> { - Ok(self.to_le_bytes().to_vec()) - } - - fn try_deserialize(bytes: &[u8]) -> crate::Result - where - Self: Sized, - { - let b = bytes - .try_into() - .map_err(|e| zferror!(ErrorKind::DeserializationError, e))?; - - Ok(i16::from_le_bytes(b)) - } -} - -impl DowncastAny for i8 { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } -} - -impl ZFData for i8 { - fn try_serialize(&self) -> crate::Result> { - Ok(self.to_le_bytes().to_vec()) - } - - fn try_deserialize(bytes: &[u8]) -> crate::Result - where - Self: Sized, - { - let b = bytes - .try_into() - .map_err(|e| zferror!(ErrorKind::DeserializationError, e))?; - - Ok(i8::from_le_bytes(b)) - } -} - -/* - * - * Float - * - */ - -impl DowncastAny for f64 { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } -} - -impl ZFData for f64 { - fn try_serialize(&self) -> crate::Result> { - Ok(self.to_le_bytes().to_vec()) - } - - fn try_deserialize(bytes: &[u8]) -> crate::Result - where - Self: Sized, - { - let b = bytes - .try_into() - .map_err(|e| zferror!(ErrorKind::DeserializationError, e))?; - - Ok(f64::from_le_bytes(b)) - } -} - -impl DowncastAny for f32 { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } -} - -impl ZFData for f32 { - fn try_serialize(&self) -> crate::Result> { - Ok(self.to_le_bytes().to_vec()) - } - - fn try_deserialize(bytes: &[u8]) -> crate::Result - where - Self: Sized, - { - let b = bytes - .try_into() - .map_err(|e| zferror!(ErrorKind::DeserializationError, e))?; - - Ok(f32::from_le_bytes(b)) - } -} - -/* - * - * Boolean - * - */ - -impl DowncastAny for bool { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } -} - -impl ZFData for bool { - fn try_serialize(&self) -> crate::Result> { - Ok(match self { - true => vec![1u8], - false => vec![0u8], - }) - } - - fn try_deserialize(bytes: &[u8]) -> crate::Result - where - Self: Sized, - { - if bytes.len() != 1 { - bail!(ErrorKind::DeserializationError) - } - - match bytes[0] { - 0 => Ok(false), - 1 => Ok(true), - _ => bail!(ErrorKind::DeserializationError), - } - } -} - -/* - * - * String - * - */ - -impl DowncastAny for String { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn as_mut_any(&mut self) -> &mut dyn std::any::Any { - self - } -} - -impl ZFData for String { - fn try_serialize(&self) -> crate::Result> { - Ok(self.as_bytes().to_vec()) - } - - fn try_deserialize(bytes: &[u8]) -> crate::Result - where - Self: Sized, - { - Ok(String::from_utf8(bytes.to_vec()) - .map_err(|e| zferror!(ErrorKind::DeserializationError, e))?) - } -} diff --git a/zenoh-flow/tests/data.rs b/zenoh-flow/tests/data.rs deleted file mode 100644 index 8ed2d86b..00000000 --- a/zenoh-flow/tests/data.rs +++ /dev/null @@ -1,89 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -use serde::{Deserialize, Serialize}; -use std::convert::From; -use std::sync::Arc; -use zenoh_flow::prelude::*; -use zenoh_flow::types::Payload; -use zenoh_flow::zenoh_flow_derive::ZFData; - -#[derive(Debug, ZFData, Clone, Serialize, Deserialize)] -struct TestData { - pub field1: u8, - pub field2: String, - pub field3: f64, -} - -impl ZFData for TestData { - fn try_serialize(&self) -> Result> { - Ok(serde_json::to_string(self) - .map_err(|e| zferror!(ErrorKind::SerializationError, e))? - .as_bytes() - .to_vec()) - } - - fn try_deserialize(bytes: &[u8]) -> Result - where - Self: Sized, - { - let json = String::from_utf8(bytes.to_vec()).unwrap(); - let data: TestData = serde_json::from_str(&json) - .map_err(|e| zferror!(ErrorKind::DeserializationError, e))?; - Ok(data) - } -} - -#[test] -fn data_wrapping_unwrapping() { - let _ = env_logger::try_init(); - - let test_data = TestData { - field1: 16u8, - field2: String::from("TestString"), - field3: 123.456f64, - }; - - let mut wrapped_data = Payload::from(test_data.clone()); - - let unwrapped_data = wrapped_data.try_get::().unwrap(); - - assert_eq!(unwrapped_data.field1, test_data.field1); - assert_eq!(unwrapped_data.field2, test_data.field2); - assert!((unwrapped_data.field3 - test_data.field3).abs() < f64::EPSILON); - - let arc_data = Arc::new(test_data.clone()); - - let mut wrapped_data = Payload::from(arc_data); - let unwrapped_data = wrapped_data.try_get::().unwrap(); - - assert_eq!(unwrapped_data.field1, test_data.field1); - assert_eq!(unwrapped_data.field2, test_data.field2); - assert!((unwrapped_data.field3 - test_data.field3).abs() < f64::EPSILON); - - let serialized_data = test_data.try_serialize().unwrap(); - - let mut wrapped_data = Payload::from(serialized_data.clone()); - - assert_eq!( - Arc::from(serialized_data), - wrapped_data.try_as_bytes().unwrap() - ); - - let unwrapped_data = wrapped_data.try_get::().unwrap(); - - assert_eq!(unwrapped_data.field1, test_data.field1); - assert_eq!(unwrapped_data.field2, test_data.field2); - assert!((unwrapped_data.field3 - test_data.field3).abs() < f64::EPSILON); -} diff --git a/zenoh-flow/tests/dataflow.rs b/zenoh-flow/tests/dataflow.rs index 716c68ad..1c9f9383 100644 --- a/zenoh-flow/tests/dataflow.rs +++ b/zenoh-flow/tests/dataflow.rs @@ -13,194 +13,205 @@ // use async_trait::async_trait; +use std::ops::Deref; use std::sync::Arc; use std::time::Duration; use zenoh::prelude::r#async::*; + use zenoh_flow::io::{Inputs, Outputs}; use zenoh_flow::model::descriptor::{InputDescriptor, OutputDescriptor}; use zenoh_flow::model::record::{OperatorRecord, PortRecord, SinkRecord, SourceRecord}; use zenoh_flow::runtime::dataflow::instance::DataFlowInstance; use zenoh_flow::runtime::dataflow::loader::{Loader, LoaderConfig}; use zenoh_flow::runtime::RuntimeContext; -use zenoh_flow::types::{Configuration, Context, LinkMessage, Message}; +use zenoh_flow::types::{Configuration, Context, LinkMessage, Message, Payload}; use zenoh_flow::{ prelude::*, DEFAULT_SHM_ALLOCATION_BACKOFF_NS, DEFAULT_SHM_ELEMENT_SIZE, DEFAULT_SHM_TOTAL_ELEMENTS, }; -static SOURCE: &str = "counter-source"; -static OP_RAW: &str = "operator-raw"; -static OP_TYPED: &str = "operator-typed"; -static SINK: &str = "generic-sink"; +static SOURCE: &str = "test-source"; +static OPERATOR: &str = "test-operator"; +static SINK: &str = "test-sink"; static IN_TYPED: &str = "in-typed"; static IN_RAW: &str = "in-raw"; static OUT_TYPED: &str = "out-typed"; static OUT_RAW: &str = "out-raw"; -// SOURCE +static RAW_VALUE: u64 = 10; +static TYPED_VALUE: u64 = 1; -struct CountSource { - output: Output, +// Use the `serde_json` crate to serialize and deserialize some u64 integers. +fn serialize_serde_json(buffer: &mut Vec, data: &u64, origin: &str) -> anyhow::Result<()> { + println!("Serializer called in: {origin}!"); + serde_json::ser::to_writer(buffer, data).map_err(|e| anyhow::anyhow!(e)) } -#[async_trait] -impl Source for CountSource { - async fn new( - _context: Context, - _configuration: Option, - mut outputs: Outputs, - ) -> Result { - println!("[CountSource] constructor"); - let output = outputs.take(OUT_TYPED).unwrap(); - - Ok(CountSource { output }) - } +fn deserialize_serde_json(bytes: &[u8], origin: &str) -> anyhow::Result { + println!("Deserializer called in: {origin}!"); + serde_json::de::from_slice::(bytes).map_err(|e| anyhow::anyhow!(e)) } -#[async_trait] -impl Node for CountSource { - async fn iteration(&self) -> Result<()> { - println!("[CountSource] Starting iteration"); - self.output.send(1usize, None).await?; - - println!("[CountSource] iteration done, sleeping"); - async_std::task::sleep(Duration::from_secs(10)).await; - - Ok(()) - } -} - -// OPERATORS +// ------------------------------------------------------------------------------------------------- +// ------------------------------------------------------------------------------------------------- +// SOURCE +// ------------------------------------------------------------------------------------------------- +// ------------------------------------------------------------------------------------------------- -/// An `OpRaw` uses, internally, only the data received in its `input_raw`. -/// -/// The objective is to test the following "forward" of messages: -/// - InputRaw -> Output -/// - InputRaw -> OutputRaw -struct OpRaw { - input_typed: Input, - input_raw: InputRaw, - output_typed: Output, +struct TestSource { + output: Output, output_raw: OutputRaw, } #[async_trait] -impl Operator for OpRaw { +impl Source for TestSource { async fn new( _context: Context, _configuration: Option, - mut inputs: Inputs, mut outputs: Outputs, ) -> Result { - println!("[OpRaw] constructor"); - Ok(OpRaw { - input_typed: inputs.take(IN_TYPED).unwrap(), - input_raw: inputs.take_raw(IN_RAW).unwrap(), - output_typed: outputs.take(OUT_TYPED).unwrap(), - output_raw: outputs.take_raw(OUT_RAW).unwrap(), - }) + println!("[TestSource] constructor"); + let output = outputs + .take(OUT_TYPED) + .expect("No `OUT_TYPED` for TestSource") + .typed(|buffer, data| serialize_serde_json(buffer, data, "TestSource")); + let output_raw = outputs + .take(OUT_RAW) + .expect("No `OUT_RAW` for TestSource") + .raw(); + + Ok(TestSource { output, output_raw }) } } #[async_trait] -impl Node for OpRaw { +impl Node for TestSource { async fn iteration(&self) -> Result<()> { - println!("[OpRaw] iteration being"); + println!("[TestSource] Starting iteration"); + self.output.send(TYPED_VALUE, None).await?; - let link_message = self.input_raw.recv().await?; - let (typed_message, _timestamp) = self.input_typed.recv().await?; + let mut buffer = Vec::new(); + serialize_serde_json(&mut buffer, &RAW_VALUE, "manual") + .expect("Failed to serialize 10u64 using `serde_json`"); + self.output_raw.send(buffer, None).await?; - if let (LinkMessage::Data(mut data_message), Message::Data(data)) = - (link_message, typed_message) - { - assert_eq!(*data_message.try_get::()?, *data); - - self.output_raw.send(data_message.clone(), None).await?; - self.output_typed.send(data_message, None).await?; - } else { - panic!("Unexpected watermark message") - } + println!("[TestSource] iteration done, sleeping"); + async_std::task::sleep(Duration::from_secs(10)).await; - println!("[OpRaw] iteration done"); Ok(()) } } -/// An `OpTyped` uses, internally, only the data received in its `input_typed`. -/// -/// The objective is to test the following "forward" of messages: -/// - Input -> Output -/// - Input -> OutputRaw -struct OpTyped { - input_typed: Input, +// ------------------------------------------------------------------------------------------------- +// ------------------------------------------------------------------------------------------------- +// OPERATOR +// ------------------------------------------------------------------------------------------------- +// ------------------------------------------------------------------------------------------------- + +struct TestOperator { + input_typed: Input, input_raw: InputRaw, - output_typed: Output, + output_typed: Output, output_raw: OutputRaw, } #[async_trait] -impl Operator for OpTyped { +impl Operator for TestOperator { async fn new( _context: Context, _configuration: Option, mut inputs: Inputs, mut outputs: Outputs, ) -> Result { - println!("[OpRaw] constructor"); - Ok(OpTyped { - input_typed: inputs.take(IN_TYPED).unwrap(), - input_raw: inputs.take_raw(IN_RAW).unwrap(), - output_typed: outputs.take(OUT_TYPED).unwrap(), - output_raw: outputs.take_raw(OUT_RAW).unwrap(), + println!("[TestOperator] constructor"); + Ok(TestOperator { + input_typed: inputs + .take(IN_TYPED) + .expect("No input `IN_TYPED` for TestOperator") + .typed(|bytes| deserialize_serde_json(bytes, "TestOperator")), + input_raw: inputs + .take(IN_RAW) + .expect("No input `IN_RAW` for TestOperator") + .raw(), + output_typed: outputs + .take(OUT_TYPED) + .expect("No output `OUT_TYPED` for TestOperator") + .typed(|buffer, data| serialize_serde_json(buffer, data, "TestOperator")), + output_raw: outputs + .take(OUT_RAW) + .expect("No output `OUT_RAW` for TestOperator") + .raw(), }) } } #[async_trait] -impl Node for OpTyped { +impl Node for TestOperator { async fn iteration(&self) -> Result<()> { - println!("[OpTyped] iteration being"); + println!("[TestOperator] iteration being"); let link_message = self.input_raw.recv().await?; let (typed_message, _timestamp) = self.input_typed.recv().await?; - if let (LinkMessage::Data(ref mut data_message), Message::Data(data)) = + if let (LinkMessage::Data(ref data_message), Message::Data(data)) = (link_message, typed_message) { - let number = data_message.try_get::()?; - assert_eq!(*number, *data); + // Check the raw input value. + // + // NOTE: in the TestSource iteration we sent the data serialized. Hence we are expecting + // to receive it as `Payload::Bytes`. + match data_message.deref() { + Payload::Bytes(bytes) => { + let value = deserialize_serde_json(bytes.as_slice(), "manual") + .expect("Failed to deserialize bytes with serde_json"); + assert_eq!(value, RAW_VALUE); + } + Payload::Typed((_dyn_data, _)) => { + panic!("Unexpected typed message") + } + } + + // Check the typed input value. + assert_eq!(TYPED_VALUE, *data); - self.output_raw.send(*data, None).await?; + self.output_raw.send(data_message.clone(), None).await?; self.output_typed.send(data, None).await?; } else { panic!("Unexpected watermark message") } - println!("[OpTyped] iteration done"); + println!("[TestOperator] iteration done"); Ok(()) } } +// ------------------------------------------------------------------------------------------------- +// ------------------------------------------------------------------------------------------------- // SINK +// ------------------------------------------------------------------------------------------------- +// ------------------------------------------------------------------------------------------------- -struct GenericSink { +struct TestSink { input_raw: InputRaw, - input_typed: Input, + input_typed: Input, } #[async_trait] -impl Sink for GenericSink { +impl Sink for TestSink { async fn new( _context: Context, _configuration: Option, mut inputs: Inputs, ) -> Result { - println!("[GenericSink] constructor"); - let input_raw = inputs.take_raw(IN_RAW).unwrap(); - let input_typed = inputs.take(IN_TYPED).unwrap(); - - Ok(GenericSink { + println!("[TestSink] constructor"); + let input_raw = inputs.take(IN_RAW).unwrap().raw(); + let input_typed = inputs + .take(IN_TYPED) + .expect("Missing input IN_TYPED for TestSink") + .typed(|bytes| deserialize_serde_json(bytes, "TestSink")); + + Ok(TestSink { input_raw, input_typed, }) @@ -208,20 +219,39 @@ impl Sink for GenericSink { } #[async_trait] -impl Node for GenericSink { +impl Node for TestSink { async fn iteration(&self) -> Result<()> { - println!("[GenericSink] Starting iteration"); + println!("[TestSink] Starting iteration"); let link_message = self.input_raw.recv().await?; let (typed_message, _timestamp) = self.input_typed.recv().await?; - if let (LinkMessage::Data(ref mut data_message), Message::Data(data)) = + if let (LinkMessage::Data(ref data_message), Message::Data(data)) = (link_message, typed_message) { - println!("[GenericSink] Received on input_raw: {data_message:?}"); - println!("[GenericSink] Received on input_typed: {data:?}"); - let number = data_message.try_get::()?; - assert_eq!(*number, *data); + match data_message.deref() { + Payload::Bytes(_) => { + panic!("Unexpected Payload::Bytes") + } + Payload::Typed((dyn_data, _)) => { + let value = (**dyn_data) + .as_any() + .downcast_ref::() + .expect("Failed to downcast"); + // NOTE: Tricky bit, we connected the typed output of the `TestOperator` to the + // raw input of the `TestSink`. Hence, when checking the value we need to + // compare it with `TYPED_VALUE`. + assert_eq!(*value, TYPED_VALUE); + } + } + + // NOTE: Tricky bit, we connected the raw output of the `TestOperator` to the + // typed input of the `TestSink`. Hence, when checking the value we need to + // compare it with `RAW_VALUE`. + // + // We should also see a call to the deserializer function called "in: TestSink!" in the + // logs (run the test with `-- --show-output`). + assert_eq!(*data, RAW_VALUE); } else { panic!("Unexpected watermark message") } @@ -230,7 +260,12 @@ impl Node for GenericSink { } } -// Run dataflow in single runtime +// ------------------------------------------------------------------------------------------------- +// ------------------------------------------------------------------------------------------------- +// MANUAL INSTANTIATION OF THE DATAFLOW +// ------------------------------------------------------------------------------------------------- +// ------------------------------------------------------------------------------------------------- + async fn single_runtime() { let _ = env_logger::try_init(); @@ -260,10 +295,16 @@ async fn single_runtime() { let source_record = SourceRecord { id: SOURCE.into(), uid: 0, - outputs: vec![PortRecord { - uid: 0, - port_id: OUT_TYPED.into(), - }], + outputs: vec![ + PortRecord { + uid: 0, + port_id: OUT_TYPED.into(), + }, + PortRecord { + uid: 1, + port_id: OUT_RAW.into(), + }, + ], uri: None, configuration: None, runtime: runtime_name.clone(), @@ -273,14 +314,14 @@ async fn single_runtime() { source_record, |context: Context, configuration: Option, outputs: Outputs| { Box::pin(async { - let node = CountSource::new(context, configuration, outputs).await?; + let node = TestSource::new(context, configuration, outputs).await?; Ok(Arc::new(node) as Arc) }) }, ); - let op_raw_record = OperatorRecord { - id: OP_RAW.into(), + let operator_record = OperatorRecord { + id: OPERATOR.into(), uid: 1, inputs: vec![ PortRecord { @@ -308,54 +349,13 @@ async fn single_runtime() { }; dataflow.add_operator( - op_raw_record, - |context: Context, - configuration: Option, - inputs: Inputs, - outputs: Outputs| { - Box::pin(async { - let node = OpRaw::new(context, configuration, inputs, outputs).await?; - Ok(Arc::new(node) as Arc) - }) - }, - ); - - let op_typed_record = OperatorRecord { - id: OP_TYPED.into(), - uid: 2, - inputs: vec![ - PortRecord { - uid: 5, - port_id: IN_RAW.into(), - }, - PortRecord { - uid: 6, - port_id: IN_TYPED.into(), - }, - ], - outputs: vec![ - PortRecord { - uid: 7, - port_id: OUT_RAW.into(), - }, - PortRecord { - uid: 8, - port_id: OUT_TYPED.into(), - }, - ], - uri: None, - configuration: None, - runtime: runtime_name.clone(), - }; - - dataflow.add_operator( - op_typed_record, + operator_record, |context: Context, configuration: Option, inputs: Inputs, outputs: Outputs| { Box::pin(async { - let node = OpTyped::new(context, configuration, inputs, outputs).await?; + let node = TestOperator::new(context, configuration, inputs, outputs).await?; Ok(Arc::new(node) as Arc) }) }, @@ -383,20 +383,20 @@ async fn single_runtime() { sink_record, |context: Context, configuration: Option, inputs: Inputs| { Box::pin(async { - let node = GenericSink::new(context, configuration, inputs).await?; + let node = TestSink::new(context, configuration, inputs).await?; Ok(Arc::new(node) as Arc) }) }, ); - // SOURCE -> OP_RAW + // SOURCE -> OPERATOR dataflow.add_link( OutputDescriptor { node: SOURCE.into(), - output: OUT_TYPED.into(), + output: OUT_RAW.into(), }, InputDescriptor { - node: OP_RAW.into(), + node: OPERATOR.into(), input: IN_RAW.into(), }, ); @@ -407,39 +407,29 @@ async fn single_runtime() { output: OUT_TYPED.into(), }, InputDescriptor { - node: OP_RAW.into(), + node: OPERATOR.into(), input: IN_TYPED.into(), }, ); - // OP_RAW -> OP_TYPED + // OPERATOR -> SINK dataflow.add_link( + // NOTE: We are connecting the raw output to the typed input. OutputDescriptor { - node: OP_RAW.into(), - output: OUT_RAW.into(), // CAVEAT: we cross, RAW -> TYPED + node: OPERATOR.into(), + output: OUT_RAW.into(), }, InputDescriptor { - node: OP_TYPED.into(), + node: SINK.into(), input: IN_TYPED.into(), }, ); dataflow.add_link( + // NOTE: We are connecting the typed output to the raw input. OutputDescriptor { - node: OP_RAW.into(), - output: OUT_TYPED.into(), // CAVEAT: we cross, TYPED -> RAW - }, - InputDescriptor { - node: OP_TYPED.into(), - input: IN_RAW.into(), - }, - ); - - // OP_TYPED -> SINK - dataflow.add_link( - OutputDescriptor { - node: OP_TYPED.into(), - output: OUT_RAW.into(), // CAVEAT: NO cross, RAW -> RAW + node: OPERATOR.into(), + output: OUT_TYPED.into(), }, InputDescriptor { node: SINK.into(), @@ -447,17 +437,6 @@ async fn single_runtime() { }, ); - dataflow.add_link( - OutputDescriptor { - node: OP_TYPED.into(), - output: OUT_TYPED.into(), // CAVEAT: NO cross, TYPED -> TYPED - }, - InputDescriptor { - node: SINK.into(), - input: IN_TYPED.into(), - }, - ); - let mut instance = DataFlowInstance::try_instantiate(dataflow, hlc.clone()) .await .unwrap();