diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index c127cdde8d3c..b1fae1be60a8 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -1,9 +1,13 @@ use std::hash::Hash; +use std::io; use std::marker::PhantomData; +use hydroflow::bytes::{Bytes, BytesMut}; use hydroflow::futures::Sink; +use hydroflow::util::cli::HydroCLI; use proc_macro2::Span; -use stageleft::{IntoQuotedMut, Quoted}; +use quote::quote; +use stageleft::{IntoQuotedMut, Quoted, RuntimeData}; use syn::parse_quote; use crate::HfBuilder; @@ -351,6 +355,79 @@ impl<'a, T> HfStream<'a, T> { } } +impl<'a> HfStream<'a, Bytes> { + pub fn send_to( + &self, + other: usize, + port_name: &str, + cli: RuntimeData<&'a HydroCLI>, + ) -> HfStream<'a, Result> { + let self_ident = &self.ident; + + let cli_splice = cli.splice(); + + let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + self.graph + .builders + .borrow_mut() + .as_mut() + .unwrap() + .entry(self.node_id) + .or_default() + .add_statement(parse_quote! { + #self_ident -> dest_sink({ + use #root::util::cli::ConnectedSink; + #cli_splice + .port(#port_name) + .connect_local_blocking::<#root::util::cli::ConnectedDirect>() + .into_sink() + }); + }); + + let recipient_next_id = { + let mut next_id = self.graph.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let ident = syn::Ident::new(&format!("stream_{}", recipient_next_id), Span::call_site()); + + self.graph + .builders + .borrow_mut() + .as_mut() + .unwrap() + .entry(other) + .or_default() + .add_statement(parse_quote! { + #ident = source_stream({ + use #root::util::cli::ConnectedSource; + #cli_splice + .port(#port_name) + .connect_local_blocking::<#root::util::cli::ConnectedDirect>() + .into_source() + }) -> tee(); + }); + + HfStream { + ident, + node_id: other, + graph: self.graph, + _phantom: PhantomData, + } + } +} + impl<'a, K, V1> HfStream<'a, (K, V1)> { pub fn join(&self, n: &HfStream<(K, V2)>) -> HfStream<'a, (K, (V1, V2))> where diff --git a/hydroflow_plus_test/python_tests/basic.py b/hydroflow_plus_test/python_tests/basic.py index 56f01eef7501..2061a37516ab 100644 --- a/hydroflow_plus_test/python_tests/basic.py +++ b/hydroflow_plus_test/python_tests/basic.py @@ -33,7 +33,7 @@ async def test_networked_basic(): sender_port = sender.client_port() sender_port.send_to(program_zero.ports.node_zero_input) - program_zero.ports.node_zero_output.send_to(program_one.ports.node_one_input) + program_zero.ports.zero_to_one.send_to(program_one.ports.zero_to_one) await deployment.deploy() diff --git a/hydroflow_plus_test/src/networked.rs b/hydroflow_plus_test/src/networked.rs index dc425af49406..454a2f2b7398 100644 --- a/hydroflow_plus_test/src/networked.rs +++ b/hydroflow_plus_test/src/networked.rs @@ -1,5 +1,5 @@ use hydroflow::bytes::BytesMut; -use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource, HydroCLI}; +use hydroflow::util::cli::{ConnectedDirect, ConnectedSource, HydroCLI}; use hydroflow_plus::scheduled::graph::Hydroflow; use hydroflow_plus::HfBuilder; use stageleft::{q, Quoted, RuntimeData}; @@ -20,28 +20,14 @@ pub fn networked_basic<'a>( ); source_zero - .map(q!(|v: Result| v.unwrap().freeze())) - .dest_sink(q!({ - cli.port("node_zero_output") - .connect_local_blocking::() - .into_sink() + .map(q!(|v| v.unwrap().freeze())) + .send_to(1, "zero_to_one", cli) + .for_each(q!(|v: Result| { + println!( + "node one received: {:?}", + std::str::from_utf8(&v.unwrap()).unwrap() + ); })); - let source_one = graph.source_stream( - 1, - q!({ - cli.port("node_one_input") - .connect_local_blocking::() - .into_source() - }), - ); - - source_one.for_each(q!(|v: Result| { - println!( - "node one received: {:?}", - std::str::from_utf8(&v.unwrap()).unwrap() - ); - })); - graph.build(node_id) }