Skip to content

Commit

Permalink
refacto: renamed build_raw and build_typed to raw and typed
Browse files Browse the repository at this point in the history
Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed May 2, 2023
1 parent b6c669c commit f9743d3
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 35 deletions.
22 changes: 11 additions & 11 deletions zenoh-flow/src/io/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ use uhlc::Timestamp;
///
/// ```ignore
/// let input_builder = inputs.take("test raw").expect("No input name 'test raw' found");
/// let input_raw = input_builder.build_raw();
/// let input_raw = input_builder.raw();
///
/// let input_builder = inputs.take("test typed").expect("No input name 'test typed' found");
/// let input: Input<u64> = input_build.build_typed(
/// let input: Input<u64> = input_build.typed(
/// |bytes| serde_json::from_slice(bytes)
/// .map_err(|e| anyhow::anyhow!(e))
/// )?;
Expand Down Expand Up @@ -91,27 +91,27 @@ impl Inputs {
///
/// ## Typed
///
/// To obtain an [`Input<T>`] one must call `build_typed` and provide a deserializer function. In
/// To obtain an [`Input<T>`] one must call `typed` and provide a deserializer function. In
/// the example below we rely on the `serde_json` crate to do the deserialization.
///
/// ```ignore
/// let input_typed: Input<u64> = inputs
/// .take("test")
/// .expect("No input named 'test' found")
/// .build_typed(
/// .typed(
/// |bytes: &[u8]| serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!(e))
/// );
/// ```
///
/// ## Raw
///
/// To obtain an [InputRaw] one must call `build_raw`.
/// To obtain an [InputRaw] one must call `raw`.
///
/// ```ignore
/// let input_raw: InputRaw = inputs
/// .take("test")
/// .expect("No input named 'test' found")
/// .build_raw();
/// .raw();
/// ```
pub fn take(&mut self, port_id: impl AsRef<str>) -> Option<InputBuilder> {
self.hmap
Expand Down Expand Up @@ -160,9 +160,9 @@ impl InputBuilder {
/// let input_raw: InputRaw = inputs
/// .take("test")
/// .expect("No input named 'test' found")
/// .build_raw();
/// .raw();
/// ```
pub fn build_raw(self) -> InputRaw {
pub fn raw(self) -> InputRaw {
InputRaw {
port_id: self.port_id,
receivers: self.receivers,
Expand All @@ -186,16 +186,16 @@ impl InputBuilder {
/// let input_typed: Input<u64> = inputs
/// .take("test")
/// .expect("No input named 'test' found")
/// .build_typed(
/// .typed(
/// |bytes: &[u8]| serde_json::from_slice(bytes).map_err(|e| anyhow::anyhow!(e))
/// );
/// ```
pub fn build_typed<T>(
pub fn typed<T>(
self,
deserializer: impl Fn(&[u8]) -> anyhow::Result<T> + Send + Sync + 'static,
) -> Input<T> {
Input {
input_raw: self.build_raw(),
input_raw: self.raw(),
deserializer: Arc::new(deserializer),
}
}
Expand Down
18 changes: 9 additions & 9 deletions zenoh-flow/src/io/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,25 @@ impl Outputs {
///
/// ## Typed
///
/// To obtain an [`Output<T>`] one must call `build_typed` and provide a serializer function. In
/// To obtain an [`Output<T>`] one must call `typed` and provide a serializer function. In
/// the example below we rely on the `serde_json` crate to do the serialization.
///
/// ```ignore
/// let output_typed: Output<u64> = outputs
/// .take("test")
/// .expect("No key named 'test' found")
/// .build_typed(|data: &u64| serde_json::to_vec(data).map_err(|e| anyhow::anyhow!(e)));
/// .typed(|data: &u64| serde_json::to_vec(data).map_err(|e| anyhow::anyhow!(e)));
/// ```
///
/// ## Raw
///
/// To obtain an [OutputRaw] one must call `build_raw`.
/// To obtain an [OutputRaw] one must call `raw`.
///
/// ```ignore
/// let output_raw = outputs
/// .take("test")
/// .expect("No key named 'test' found")
/// .build_raw();
/// .raw();
/// ```
pub fn take(&mut self, port_id: impl AsRef<str>) -> Option<OutputBuilder> {
self.hmap
Expand Down Expand Up @@ -152,9 +152,9 @@ impl OutputBuilder {
/// let output_raw = outputs
/// .take("test")
/// .expect("No key named 'test' found")
/// .build_raw();
/// .raw();
/// ```
pub fn build_raw(self) -> OutputRaw {
pub fn raw(self) -> OutputRaw {
OutputRaw {
port_id: self.port_id,
senders: self.senders,
Expand Down Expand Up @@ -186,15 +186,15 @@ impl OutputBuilder {
/// let output_typed: Output<u64> = outputs
/// .take("test")
/// .expect("No key named 'test' found")
/// .build_typed(|data: &u64| serde_json::to_vec(data).map_err(|e| anyhow::anyhow!(e)));
/// .typed(|data: &u64| serde_json::to_vec(data).map_err(|e| anyhow::anyhow!(e)));
/// ```
pub fn build_typed<T: Send + Sync + 'static>(
pub fn typed<T: Send + Sync + 'static>(
self,
serializer: impl Fn(&mut Vec<u8>, &T) -> anyhow::Result<()> + Send + Sync + 'static,
) -> Output<T> {
Output {
_phantom: PhantomData,
output_raw: self.build_raw(),
output_raw: self.raw(),
serializer: Arc::new(move |buffer, data| {
if let Some(typed) = (*data).as_any().downcast_ref::<T>() {
match (serializer)(buffer, typed) {
Expand Down
2 changes: 1 addition & 1 deletion zenoh-flow/src/io/tests/output-tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fn test_typed_output<T: Send + Sync + Clone + std::fmt::Debug + PartialEq + 'sta
let output = outputs
.take(&key)
.expect("Wrong key provided")
.build_typed(serializer);
.typed(serializer);

output
.try_send(expected_data.clone(), None)
Expand Down
4 changes: 2 additions & 2 deletions zenoh-flow/src/runtime/dataflow/instance/builtin/zenoh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl<'a> Source for ZenohSource<'a> {
ErrorKind::MissingOutput(id.clone()),
"Unable to find output: {id}"
))?
.build_raw();
.raw();
let subscriber = context
.zenoh_session()
.declare_subscriber(&ke)
Expand Down Expand Up @@ -427,7 +427,7 @@ impl<'a> Sink for ZenohSink<'a> {
ErrorKind::MissingInput(id.clone()),
"Unable to find input: {id}"
))?
.build_raw();
.raw();
let subscriber = context.zenoh_session().declare_publisher(ke).res().await?;

publishers.insert(id.clone().into(), subscriber);
Expand Down
8 changes: 4 additions & 4 deletions zenoh-flow/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl<T: 'static + Send + Sync> SendSyncAny for T {
/// let output = outputs
/// .take("out")
/// .expect("No output called 'out' found")
/// .build_typed(|buffer, data| todo!("Provide your serializer here"));
/// .typed(|buffer, data| todo!("Provide your serializer here"));
///
/// Ok(Self { output })
/// }
Expand Down Expand Up @@ -148,7 +148,7 @@ pub trait Source: Node + Send + Sync {
/// let input = inputs
/// .take("in")
/// .expect("No input called 'in' found")
/// .build_typed(|bytes| todo!("Provide your deserializer here"));
/// .typed(|bytes| todo!("Provide your deserializer here"));
///
/// Ok(GenericSink { input })
/// }
Expand Down Expand Up @@ -222,11 +222,11 @@ pub trait Sink: Node + Send + Sync {
/// input: inputs
/// .take("in")
/// .expect("No input called 'in' found")
/// .build_typed(|bytes| todo!("Provide your deserializer here")),
/// .typed(|bytes| todo!("Provide your deserializer here")),
/// output: outputs
/// .take("out")
/// .expect("No output called 'out' found")
/// .build_typed(|buffer, data| todo!("Provide your serializer here")),
/// .typed(|buffer, data| todo!("Provide your serializer here")),
/// })
/// }
/// }
Expand Down
16 changes: 8 additions & 8 deletions zenoh-flow/tests/dataflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ impl Source for TestSource {
let output = outputs
.take(OUT_TYPED)
.expect("No `OUT_TYPED` for TestSource")
.build_typed(|buffer, data| serialize_serde_json(buffer, data, "TestSource"));
.typed(|buffer, data| serialize_serde_json(buffer, data, "TestSource"));
let output_raw = outputs
.take(OUT_RAW)
.expect("No `OUT_RAW` for TestSource")
.build_raw();
.raw();

Ok(TestSource { output, output_raw })
}
Expand Down Expand Up @@ -129,19 +129,19 @@ impl Operator for TestOperator {
input_typed: inputs
.take(IN_TYPED)
.expect("No input `IN_TYPED` for TestOperator")
.build_typed(|bytes| deserialize_serde_json(bytes, "TestOperator")),
.typed(|bytes| deserialize_serde_json(bytes, "TestOperator")),
input_raw: inputs
.take(IN_RAW)
.expect("No input `IN_RAW` for TestOperator")
.build_raw(),
.raw(),
output_typed: outputs
.take(OUT_TYPED)
.expect("No output `OUT_TYPED` for TestOperator")
.build_typed(|buffer, data| serialize_serde_json(buffer, data, "TestOperator")),
.typed(|buffer, data| serialize_serde_json(buffer, data, "TestOperator")),
output_raw: outputs
.take(OUT_RAW)
.expect("No output `OUT_RAW` for TestOperator")
.build_raw(),
.raw(),
})
}
}
Expand Down Expand Up @@ -205,11 +205,11 @@ impl Sink for TestSink {
mut inputs: Inputs,
) -> Result<Self> {
println!("[TestSink] constructor");
let input_raw = inputs.take(IN_RAW).unwrap().build_raw();
let input_raw = inputs.take(IN_RAW).unwrap().raw();
let input_typed = inputs
.take(IN_TYPED)
.expect("Missing input IN_TYPED for TestSink")
.build_typed(|bytes| deserialize_serde_json(bytes, "TestSink"));
.typed(|bytes| deserialize_serde_json(bytes, "TestSink"));

Ok(TestSink {
input_raw,
Expand Down

0 comments on commit f9743d3

Please sign in to comment.