Skip to content

Commit

Permalink
feat(hydroflow): add preliminary send_to operator for multi-node gr…
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Dec 16, 2023
1 parent 11f81ac commit 3b30b66
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 24 deletions.
79 changes: 78 additions & 1 deletion hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::hash::Hash;
use std::io;
use std::marker::PhantomData;

use hydroflow::bytes::{Bytes, BytesMut};
use hydroflow::futures::Sink;
use hydroflow::util::cli::HydroCLI;
use proc_macro2::Span;
use stageleft::{IntoQuotedMut, Quoted};
use quote::quote;
use stageleft::{IntoQuotedMut, Quoted, RuntimeData};
use syn::parse_quote;

use crate::HfBuilder;
Expand Down Expand Up @@ -351,6 +355,79 @@ impl<'a, T> HfStream<'a, T> {
}
}

impl<'a> HfStream<'a, Bytes> {
pub fn send_to(
&self,
other: usize,
port_name: &str,
cli: RuntimeData<&'a HydroCLI>,
) -> HfStream<'a, Result<BytesMut, io::Error>> {
let self_ident = &self.ident;

let cli_splice = cli.splice();

let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus")
.expect("hydroflow_plus should be present in `Cargo.toml`");
let root = match hydroflow_crate {
proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus },
proc_macro_crate::FoundCrate::Name(name) => {
let ident = syn::Ident::new(&name, Span::call_site());
quote! { #ident }
}
};

self.graph
.builders
.borrow_mut()
.as_mut()
.unwrap()
.entry(self.node_id)
.or_default()
.add_statement(parse_quote! {
#self_ident -> dest_sink({
use #root::util::cli::ConnectedSink;
#cli_splice
.port(#port_name)
.connect_local_blocking::<#root::util::cli::ConnectedDirect>()
.into_sink()
});
});

let recipient_next_id = {
let mut next_id = self.graph.next_id.borrow_mut();
let id = *next_id;
*next_id += 1;
id
};

let ident = syn::Ident::new(&format!("stream_{}", recipient_next_id), Span::call_site());

self.graph
.builders
.borrow_mut()
.as_mut()
.unwrap()
.entry(other)
.or_default()
.add_statement(parse_quote! {
#ident = source_stream({
use #root::util::cli::ConnectedSource;
#cli_splice
.port(#port_name)
.connect_local_blocking::<#root::util::cli::ConnectedDirect>()
.into_source()
}) -> tee();
});

HfStream {
ident,
node_id: other,
graph: self.graph,
_phantom: PhantomData,
}
}
}

impl<'a, K, V1> HfStream<'a, (K, V1)> {
pub fn join<V2>(&self, n: &HfStream<(K, V2)>) -> HfStream<'a, (K, (V1, V2))>
where
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus_test/python_tests/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def test_networked_basic():
sender_port = sender.client_port()
sender_port.send_to(program_zero.ports.node_zero_input)

program_zero.ports.node_zero_output.send_to(program_one.ports.node_one_input)
program_zero.ports.zero_to_one.send_to(program_one.ports.zero_to_one)

await deployment.deploy()

Expand Down
30 changes: 8 additions & 22 deletions hydroflow_plus_test/src/networked.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use hydroflow::bytes::BytesMut;
use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource, HydroCLI};
use hydroflow::util::cli::{ConnectedDirect, ConnectedSource, HydroCLI};
use hydroflow_plus::scheduled::graph::Hydroflow;
use hydroflow_plus::HfBuilder;
use stageleft::{q, Quoted, RuntimeData};
Expand All @@ -20,28 +20,14 @@ pub fn networked_basic<'a>(
);

source_zero
.map(q!(|v: Result<BytesMut, _>| v.unwrap().freeze()))
.dest_sink(q!({
cli.port("node_zero_output")
.connect_local_blocking::<ConnectedDirect>()
.into_sink()
.map(q!(|v| v.unwrap().freeze()))
.send_to(1, "zero_to_one", cli)
.for_each(q!(|v: Result<BytesMut, _>| {
println!(
"node one received: {:?}",
std::str::from_utf8(&v.unwrap()).unwrap()
);
}));

let source_one = graph.source_stream(
1,
q!({
cli.port("node_one_input")
.connect_local_blocking::<ConnectedDirect>()
.into_source()
}),
);

source_one.for_each(q!(|v: Result<BytesMut, _>| {
println!(
"node one received: {:?}",
std::str::from_utf8(&v.unwrap()).unwrap()
);
}));

graph.build(node_id)
}

0 comments on commit 3b30b66

Please sign in to comment.