diff --git a/Cargo.lock b/Cargo.lock index c10dd202a585..6a0943e9fc83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1490,12 +1490,25 @@ dependencies = [ "syn 2.0.14", ] +[[package]] +name = "hydroflow_plus_cli_integration" +version = "0.5.0" +dependencies = [ + "async-channel", + "hydro_cli", + "hydroflow_plus", + "stageleft", + "tokio", +] + [[package]] name = "hydroflow_plus_test" version = "0.0.0" dependencies = [ + "hydro_cli", "hydroflow", "hydroflow_plus", + "hydroflow_plus_cli_integration", "hydroflow_plus_test_macro", "insta", "regex", diff --git a/Cargo.toml b/Cargo.toml index 1ef74afd2113..5b886f148ed6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "hydroflow_lang", "hydroflow_macro", "hydroflow_plus", + "hydroflow_plus_cli_integration", "hydroflow_plus_test", "hydroflow_plus_test_macro", "lattices", diff --git a/hydro_cli/Cargo.toml b/hydro_cli/Cargo.toml index 4c802528b34f..7338b7da0623 100644 --- a/hydro_cli/Cargo.toml +++ b/hydro_cli/Cargo.toml @@ -8,9 +8,9 @@ documentation = "https://docs.rs/hydro_cli/" description = "Hydro Deploy Command Line Interface" [lib] -name = "hydro" +name = "hydro_cli" # "cdylib" is necessary to produce a shared library for Python to import from. -crate-type = ["cdylib"] +crate-type = ["lib", "cdylib"] [dependencies] tokio = { version = "1.16", features = [ "full" ] } diff --git a/hydro_cli/src/core/custom_service.rs b/hydro_cli/src/core/custom_service.rs index 3f5695f3a90a..8ab8c990b441 100644 --- a/hydro_cli/src/core/custom_service.rs +++ b/hydro_cli/src/core/custom_service.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, Weak}; use anyhow::{bail, Result}; use async_trait::async_trait; -use hydroflow_cli_integration::ServerPort; +use hydroflow_cli_integration::{ConnectedDirect, ServerPort}; use tokio::sync::RwLock; use super::hydroflow_crate::ports::{ @@ -32,6 +32,10 @@ impl CustomService { launched_host: None, } } + + pub fn create_port(&self, self_arc: &Arc>) -> CustomClientPort { + CustomClientPort::new(Arc::downgrade(self_arc)) + } } #[async_trait] @@ -95,6 +99,17 @@ impl CustomClientPort { .load_instantiated(&|p| p) .await } + + pub async fn connect(&self) -> ConnectedDirect { + self.client_port + .as_ref() + .unwrap() + .load_instantiated(&|p| p) + .await + .instantiate() + .connect::() + .await + } } impl HydroflowSource for CustomClientPort { diff --git a/hydro_cli/src/core/deployment.rs b/hydro_cli/src/core/deployment.rs index 7cc7a1442239..f2a4430c939c 100644 --- a/hydro_cli/src/core/deployment.rs +++ b/hydro_cli/src/core/deployment.rs @@ -1,15 +1,19 @@ -use std::sync::{Arc, Weak}; +use std::path::PathBuf; +use std::sync::Arc; use anyhow::Result; use futures::{StreamExt, TryStreamExt}; use tokio::sync::RwLock; -use super::{progress, Host, ResourcePool, ResourceResult, Service}; +use super::{ + progress, CustomService, Host, HydroflowCrate, LocalhostHost, ResourcePool, ResourceResult, + Service, +}; #[derive(Default)] pub struct Deployment { pub hosts: Vec>>, - pub services: Vec>>, + pub services: Vec>>, pub resource_pool: ResourcePool, last_resource_result: Option>, next_host_id: usize, @@ -17,24 +21,59 @@ pub struct Deployment { } impl Deployment { + pub fn new() -> Self { + Self::default() + } + + #[allow(non_snake_case)] + pub fn Localhost(&mut self) -> Arc> { + self.add_host(LocalhostHost::new) + } + + #[allow(non_snake_case)] + pub fn CustomService( + &mut self, + on: Arc>, + external_ports: Vec, + ) -> Arc> { + self.add_service(|id| CustomService::new(id, on, external_ports)) + } + + #[allow(non_snake_case, clippy::too_many_arguments)] + pub fn HydroflowCrate( + &mut self, + src: impl Into, + on: Arc>, + bin: Option, + example: Option, + profile: Option, + features: Option>, + args: Option>, + display_id: Option, + external_ports: Vec, + ) -> Arc> { + self.add_service(|id| { + crate::core::HydroflowCrate::new( + id, + src.into(), + on, + bin, + example, + profile, + features, + args, + display_id, + external_ports, + ) + }) + } + pub async fn deploy(&mut self) -> Result<()> { progress::ProgressTracker::with_group("deploy", None, || async { let mut resource_batch = super::ResourceBatch::new(); - let active_services = self - .services - .iter() - .filter(|service| service.upgrade().is_some()) - .cloned() - .collect::>(); - self.services = active_services; for service in self.services.iter_mut() { - service - .upgrade() - .unwrap() - .write() - .await - .collect_resources(&mut resource_batch); + service.write().await.collect_resources(&mut resource_batch); } for host in self.hosts.iter_mut() { @@ -66,14 +105,8 @@ impl Deployment { let services_future = self .services .iter_mut() - .map(|service: &mut Weak>| async { - service - .upgrade() - .unwrap() - .write() - .await - .deploy(&result) - .await + .map(|service: &mut Arc>| async { + service.write().await.deploy(&result).await }) .collect::>(); @@ -87,8 +120,8 @@ impl Deployment { let all_services_ready = self.services .iter() - .map(|service: &Weak>| async { - service.upgrade().unwrap().write().await.ready().await?; + .map(|service: &Arc>| async { + service.write().await.ready().await?; Ok(()) as Result<()> }); @@ -102,20 +135,12 @@ impl Deployment { } pub async fn start(&mut self) -> Result<()> { - let active_services = self - .services - .iter() - .filter(|service| service.upgrade().is_some()) - .cloned() - .collect::>(); - self.services = active_services; - progress::ProgressTracker::with_group("start", None, || { let all_services_start = self.services .iter() - .map(|service: &Weak>| async { - service.upgrade().unwrap().write().await.start().await?; + .map(|service: &Arc>| async { + service.write().await.start().await?; Ok(()) as Result<()> }); @@ -144,7 +169,7 @@ impl Deployment { self.next_service_id += 1; let dyn_arc: Arc> = arc.clone(); - self.services.push(Arc::downgrade(&dyn_arc)); + self.services.push(dyn_arc); arc } } diff --git a/hydro_cli/src/core/localhost.rs b/hydro_cli/src/core/localhost.rs index 090969e24d4d..1647ff345008 100644 --- a/hydro_cli/src/core/localhost.rs +++ b/hydro_cli/src/core/localhost.rs @@ -128,15 +128,15 @@ pub fn create_broadcast( } if let Some(cli_receivers) = weak_cli_receivers.upgrade() { - let cli_receivers = cli_receivers.write().await; - for r in cli_receivers.iter() { + let mut cli_receivers = cli_receivers.write().await; + for r in cli_receivers.drain(..) { r.close(); } } if let Some(receivers) = weak_receivers.upgrade() { - let receivers = receivers.write().await; - for r in receivers.iter() { + let mut receivers = receivers.write().await; + for r in receivers.drain(..) { r.close(); } } diff --git a/hydro_cli/src/lib.rs b/hydro_cli/src/lib.rs index 59c2c3e80ea7..0343ee1282ce 100644 --- a/hydro_cli/src/lib.rs +++ b/hydro_cli/src/lib.rs @@ -159,10 +159,7 @@ impl Deployment { #[allow(non_snake_case)] fn Localhost(&self, py: Python<'_>) -> PyResult> { - let arc = self - .underlying - .blocking_write() - .add_host(crate::core::LocalhostHost::new); + let arc = self.underlying.blocking_write().Localhost(); Ok(Py::new( py, @@ -214,9 +211,10 @@ impl Deployment { on: &Host, external_ports: Vec, ) -> PyResult> { - let service = self.underlying.blocking_write().add_service(|id| { - crate::core::CustomService::new(id, on.underlying.clone(), external_ports) - }); + let service = self + .underlying + .blocking_write() + .CustomService(on.underlying.clone(), external_ports); Ok(Py::new( py, @@ -244,20 +242,17 @@ impl Deployment { display_id: Option, external_ports: Option>, ) -> PyResult> { - let service = self.underlying.blocking_write().add_service(|id| { - crate::core::HydroflowCrate::new( - id, - src.into(), - on.underlying.clone(), - bin, - example, - profile, - features, - args, - display_id, - external_ports.unwrap_or_default(), - ) - }); + let service = self.underlying.blocking_write().HydroflowCrate( + src, + on.underlying.clone(), + bin, + example, + profile, + features, + args, + display_id, + external_ports.unwrap_or_default(), + ); Ok(Py::new( py, diff --git a/hydroflow_cli_integration/src/lib.rs b/hydroflow_cli_integration/src/lib.rs index 8841c5157f4c..5e85269b0568 100644 --- a/hydroflow_cli_integration/src/lib.rs +++ b/hydroflow_cli_integration/src/lib.rs @@ -41,6 +41,12 @@ pub enum ServerPort { Null, } +impl ServerPort { + pub fn instantiate(&self) -> ServerOrBound { + ServerOrBound::Server(self.into()) + } +} + #[derive(Debug)] pub enum RealizedServerPort { UnixSocket(JoinHandle>), diff --git a/hydroflow_plus/src/builder.rs b/hydroflow_plus/src/builder.rs index 256423a011ff..8ed1d3e0c39a 100644 --- a/hydroflow_plus/src/builder.rs +++ b/hydroflow_plus/src/builder.rs @@ -1,7 +1,9 @@ use std::cell::RefCell; use std::collections::BTreeMap; +use std::io; use std::marker::PhantomData; +use hydroflow::bytes::BytesMut; use hydroflow::futures::stream::Stream; use hydroflow_lang::graph::{ eliminate_extra_unions_tees, partition_graph, propegate_flow_props, FlatGraphBuilder, @@ -11,11 +13,13 @@ use quote::quote; use stageleft::{IntoQuotedOnce, Quoted, QuotedContext}; use syn::parse_quote; +use crate::node::HfNode; use crate::{HfBuilt, HfStream, RuntimeContext}; pub struct HfBuilder<'a> { pub(crate) next_id: RefCell, pub(crate) builders: RefCell>>, + next_node_id: RefCell, _phantom: PhantomData<&'a mut &'a ()>, } @@ -31,10 +35,18 @@ impl<'a> HfBuilder<'a> { HfBuilder { next_id: RefCell::new(0), builders: RefCell::new(Some(Default::default())), + next_node_id: RefCell::new(0), _phantom: PhantomData, } } + pub fn next_node_id(&self) -> usize { + let mut next_node_id = self.next_node_id.borrow_mut(); + let id = *next_node_id; + *next_node_id += 1; + id + } + pub fn build(&self, id: impl Quoted<'a, usize>) -> HfBuilt<'a> { let builders = self.builders.borrow_mut().take().unwrap(); @@ -100,11 +112,11 @@ impl<'a> HfBuilder<'a> { } } - pub fn source_stream + Unpin>( + pub fn source_stream + Unpin, N: HfNode<'a>>( &'a self, - node_id: usize, + node: &N, e: impl Quoted<'a, E>, - ) -> HfStream<'a, T> { + ) -> HfStream<'a, T, N> { let next_id = { let mut next_id = self.next_id.borrow_mut(); let id = *next_id; @@ -119,7 +131,7 @@ impl<'a> HfBuilder<'a> { .borrow_mut() .as_mut() .unwrap() - .entry(node_id) + .entry(node.id()) .or_default() .add_statement(parse_quote! { #ident = source_stream(#e) -> tee(); @@ -127,17 +139,66 @@ impl<'a> HfBuilder<'a> { HfStream { ident, - node_id, + node: node.clone(), + graph: self, + _phantom: PhantomData, + } + } + + pub fn source_port>( + &'a self, + node: &N, + port: &str, + ) -> HfStream<'a, Result, N> { + let next_id = { + let mut next_id = self.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + + let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); + let cli_splice = node.get_cli().splice(); + + let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") + .expect("hydroflow_plus should be present in `Cargo.toml`"); + let root = match hydroflow_crate { + proc_macro_crate::FoundCrate::Itself => quote! { hydroflow_plus }, + proc_macro_crate::FoundCrate::Name(name) => { + let ident = syn::Ident::new(&name, Span::call_site()); + quote! { #ident } + } + }; + + self.builders + .borrow_mut() + .as_mut() + .unwrap() + .entry(node.id()) + .or_default() + .add_statement(parse_quote! { + #ident = source_stream({ + use #root::util::cli::ConnectedSource; + #cli_splice + .port(#port) + .connect_local_blocking::<#root::util::cli::ConnectedDirect>() + .into_source() + }) -> tee(); + }); + + HfStream { + ident, + node: node.clone(), graph: self, _phantom: PhantomData, } } - pub fn source_iter>( + pub fn source_iter, N: HfNode<'a>>( &'a self, - node_id: usize, + node: &N, e: impl IntoQuotedOnce<'a, E>, - ) -> HfStream<'a, T> { + ) -> HfStream<'a, T, N> { let next_id = { let mut next_id = self.next_id.borrow_mut(); let id = *next_id; @@ -152,7 +213,7 @@ impl<'a> HfBuilder<'a> { .borrow_mut() .as_mut() .unwrap() - .entry(node_id) + .entry(node.id()) .or_default() .add_statement(parse_quote! { #ident = source_iter(#e) -> tee(); @@ -160,13 +221,13 @@ impl<'a> HfBuilder<'a> { HfStream { ident, - node_id, + node: node.clone(), graph: self, _phantom: PhantomData, } } - pub fn cycle(&'a self, node_id: usize) -> (HfCycle<'a, T>, HfStream<'a, T>) { + pub fn cycle>(&'a self, node: &N) -> (HfCycle<'a, T, N>, HfStream<'a, T, N>) { let next_id = { let mut next_id = self.next_id.borrow_mut(); let id = *next_id; @@ -180,7 +241,7 @@ impl<'a> HfBuilder<'a> { .borrow_mut() .as_mut() .unwrap() - .entry(node_id) + .entry(node.id()) .or_default() .add_statement(parse_quote! { #ident = tee(); @@ -189,13 +250,13 @@ impl<'a> HfBuilder<'a> { ( HfCycle { ident: ident.clone(), - node_id, + node: node.clone(), graph: self, _phantom: PhantomData, }, HfStream { ident, - node_id, + node: node.clone(), graph: self, _phantom: PhantomData, }, @@ -203,15 +264,15 @@ impl<'a> HfBuilder<'a> { } } -pub struct HfCycle<'a, T> { +pub struct HfCycle<'a, T, N: HfNode<'a>> { ident: syn::Ident, - node_id: usize, + node: N, graph: &'a HfBuilder<'a>, _phantom: PhantomData, } -impl<'a, T> HfCycle<'a, T> { - pub fn complete(self, stream: &HfStream<'a, T>) { +impl<'a, T, N: HfNode<'a>> HfCycle<'a, T, N> { + pub fn complete(self, stream: &HfStream<'a, T, N>) { let ident = self.ident; let stream_ident = stream.ident.clone(); @@ -220,7 +281,7 @@ impl<'a, T> HfCycle<'a, T> { .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #stream_ident -> #ident; diff --git a/hydroflow_plus/src/lib.rs b/hydroflow_plus/src/lib.rs index f51cb79e4761..3139dff812c5 100644 --- a/hydroflow_plus/src/lib.rs +++ b/hydroflow_plus/src/lib.rs @@ -11,6 +11,8 @@ use stageleft::Quoted; mod stream; pub use stream::HfStream; +pub mod node; + mod builder; pub use builder::HfBuilder; diff --git a/hydroflow_plus/src/node.rs b/hydroflow_plus/src/node.rs new file mode 100644 index 000000000000..82bafc786524 --- /dev/null +++ b/hydroflow_plus/src/node.rs @@ -0,0 +1,116 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use hydroflow::util::cli::HydroCLI; +use stageleft::RuntimeData; + +use crate::HfBuilder; + +pub trait HfNode<'a>: Clone { + fn id(&self) -> usize; + fn next_port(&self) -> String; + fn get_cli(&self) -> RuntimeData<&'a HydroCLI>; +} + +impl<'a> HfNode<'a> for () { + fn id(&self) -> usize { + 0 + } + + fn next_port(&self) -> String { + panic!(); + } + + fn get_cli(&self) -> RuntimeData<&'a HydroCLI> { + panic!(); + } +} + +impl<'a> HfNode<'a> for usize { + fn id(&self) -> usize { + *self + } + + fn next_port(&self) -> String { + panic!(); + } + + fn get_cli(&self) -> RuntimeData<&'a HydroCLI> { + panic!(); + } +} + +#[derive(Clone)] +pub struct CLIRuntimeNode<'a> { + id: usize, + next_port: Rc>, + cli: RuntimeData<&'a HydroCLI>, +} + +impl<'a> CLIRuntimeNode<'a> { + pub fn new(id: usize, cli: RuntimeData<&'a HydroCLI>) -> CLIRuntimeNode { + CLIRuntimeNode { + id, + next_port: Rc::new(RefCell::new(0)), + cli, + } + } +} + +impl<'a> HfNode<'a> for CLIRuntimeNode<'a> { + fn id(&self) -> usize { + self.id + } + + fn next_port(&self) -> String { + let next_send_port = *self.next_port.borrow(); + *self.next_port.borrow_mut() += 1; + format!("port_{}", next_send_port) + } + + fn get_cli(&self) -> RuntimeData<&'a HydroCLI> { + self.cli + } +} + +pub trait HfConnectable<'a, O: HfNode<'a>> { + fn connect(&self, other: &O, source_port: &str, recipient_port: &str); +} + +impl<'a> HfConnectable<'a, CLIRuntimeNode<'a>> for CLIRuntimeNode<'a> { + fn connect(&self, _other: &CLIRuntimeNode, _source_port: &str, _recipient_port: &str) {} +} + +pub trait HFNodeBuilder<'a, N> { + fn build(&mut self, graph: &'a HfBuilder<'a>) -> N; +} + +pub struct CLIRuntimeNodeBuilder<'a> { + cli: RuntimeData<&'a HydroCLI>, +} + +impl CLIRuntimeNodeBuilder<'_> { + pub fn new(cli: RuntimeData<&HydroCLI>) -> CLIRuntimeNodeBuilder { + CLIRuntimeNodeBuilder { cli } + } +} + +impl<'a> HFNodeBuilder<'a, CLIRuntimeNode<'a>> for CLIRuntimeNodeBuilder<'a> { + fn build(&mut self, builder: &'a HfBuilder<'a>) -> CLIRuntimeNode<'a> { + CLIRuntimeNode::new(builder.next_node_id(), self.cli) + } +} + +pub trait HFDeploy<'a> { + type Node: HfNode<'a> + HfConnectable<'a, Self::Node>; + type NodeBuilder: HFNodeBuilder<'a, Self::Node>; +} + +pub struct CLIRuntime<'b> { + _marker: std::marker::PhantomData<&'b ()>, +} + +impl<'a: 'b, 'b> HFDeploy<'a> for CLIRuntime<'b> { + type Node = CLIRuntimeNode<'a>; + type NodeBuilder = CLIRuntimeNodeBuilder<'a>; +} diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index b1fae1be60a8..9f8e7532f8d3 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -4,23 +4,23 @@ use std::marker::PhantomData; use hydroflow::bytes::{Bytes, BytesMut}; use hydroflow::futures::Sink; -use hydroflow::util::cli::HydroCLI; use proc_macro2::Span; use quote::quote; -use stageleft::{IntoQuotedMut, Quoted, RuntimeData}; +use stageleft::{IntoQuotedMut, Quoted}; use syn::parse_quote; +use crate::node::{HfConnectable, HfNode}; use crate::HfBuilder; -pub struct HfStream<'a, T> { +pub struct HfStream<'a, T, N: HfNode<'a>> { pub(crate) ident: syn::Ident, - pub(crate) node_id: usize, + pub(crate) node: N, pub(crate) graph: &'a HfBuilder<'a>, pub(crate) _phantom: PhantomData<&'a mut &'a T>, } -impl<'a, T> HfStream<'a, T> { - pub fn map U + 'a>(&self, f: impl IntoQuotedMut<'a, F>) -> HfStream<'a, U> { +impl<'a, T, N: HfNode<'a>> HfStream<'a, T, N> { + pub fn map U + 'a>(&self, f: impl IntoQuotedMut<'a, F>) -> HfStream<'a, U, N> { let next_id = { let mut next_id = self.graph.next_id.borrow_mut(); let id = *next_id; @@ -37,7 +37,7 @@ impl<'a, T> HfStream<'a, T> { .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #ident = #self_ident -> map(#f) -> tee(); @@ -45,13 +45,46 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, + node: self.node.clone(), graph: self.graph, _phantom: PhantomData, } } - pub fn filter bool + 'a>(&self, f: impl IntoQuotedMut<'a, F>) -> HfStream<'a, T> { + pub fn inspect(&self, f: impl IntoQuotedMut<'a, F>) -> HfStream<'a, T, N> { + let next_id = { + let mut next_id = self.graph.next_id.borrow_mut(); + let id = *next_id; + *next_id += 1; + id + }; + let self_ident = &self.ident; + let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); + let f = f.splice(); + + self.graph + .builders + .borrow_mut() + .as_mut() + .unwrap() + .entry(self.node.id()) + .or_default() + .add_statement(parse_quote! { + #ident = #self_ident -> inspect(#f) -> tee(); + }); + + HfStream { + ident, + node: self.node.clone(), + graph: self.graph, + _phantom: PhantomData, + } + } + + pub fn filter bool + 'a>( + &self, + f: impl IntoQuotedMut<'a, F>, + ) -> HfStream<'a, T, N> { let next_id = { let mut next_id = self.graph.next_id.borrow_mut(); let id = *next_id; @@ -68,7 +101,7 @@ impl<'a, T> HfStream<'a, T> { .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #ident = #self_ident -> filter(#f) -> tee(); @@ -76,7 +109,7 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, + node: self.node.clone(), graph: self.graph, _phantom: PhantomData, } @@ -85,7 +118,7 @@ impl<'a, T> HfStream<'a, T> { pub fn filter_map Option + 'a>( &self, f: impl IntoQuotedMut<'a, F>, - ) -> HfStream<'a, U> { + ) -> HfStream<'a, U, N> { let next_id = { let mut next_id = self.graph.next_id.borrow_mut(); let id = *next_id; @@ -102,7 +135,7 @@ impl<'a, T> HfStream<'a, T> { .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #ident = #self_ident -> filter_map(#f) -> tee(); @@ -110,7 +143,7 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, + node: self.node.clone(), graph: self.graph, _phantom: PhantomData, } @@ -120,7 +153,7 @@ impl<'a, T> HfStream<'a, T> { &self, init: impl IntoQuotedMut<'a, I>, comb: impl IntoQuotedMut<'a, C>, - ) -> HfStream<'a, A> { + ) -> HfStream<'a, A, N> { let next_id = { let mut next_id = self.graph.next_id.borrow_mut(); let id = *next_id; @@ -137,7 +170,7 @@ impl<'a, T> HfStream<'a, T> { .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #ident = #self_ident -> fold(#init, #comb) -> tee(); @@ -145,13 +178,13 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, + node: self.node.clone(), graph: self.graph, _phantom: PhantomData, } } - pub fn persist(&self) -> HfStream<'a, T> { + pub fn persist(&self) -> HfStream<'a, T, N> { let next_id = { let mut next_id = self.graph.next_id.borrow_mut(); let id = *next_id; @@ -167,7 +200,7 @@ impl<'a, T> HfStream<'a, T> { .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #ident = #self_ident -> persist() -> tee(); @@ -175,13 +208,13 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, + node: self.node.clone(), graph: self.graph, _phantom: PhantomData, } } - pub fn delta(&self) -> HfStream<'a, T> { + pub fn delta(&self) -> HfStream<'a, T, N> { let next_id = { let mut next_id = self.graph.next_id.borrow_mut(); let id = *next_id; @@ -197,7 +230,7 @@ impl<'a, T> HfStream<'a, T> { .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #ident = #self_ident -> multiset_delta() -> tee(); @@ -205,13 +238,13 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, + node: self.node.clone(), graph: self.graph, _phantom: PhantomData, } } - pub fn unique(&self) -> HfStream<'a, T> + pub fn unique(&self) -> HfStream<'a, T, N> where T: Eq + Hash, { @@ -230,7 +263,7 @@ impl<'a, T> HfStream<'a, T> { .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #ident = #self_ident -> unique::<'tick>() -> tee(); @@ -238,13 +271,13 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, + node: self.node.clone(), graph: self.graph, _phantom: PhantomData, } } - pub fn cross_product(&self, other: &HfStream) -> HfStream<'a, (T, O)> { + pub fn cross_product(&self, other: &HfStream<'a, O, N>) -> HfStream<'a, (T, O), N> { let next_id = { let mut next_id = self.graph.next_id.borrow_mut(); let id = *next_id; @@ -257,7 +290,11 @@ impl<'a, T> HfStream<'a, T> { let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); let mut builders = self.graph.builders.borrow_mut(); - let builder = builders.as_mut().unwrap().entry(self.node_id).or_default(); + let builder = builders + .as_mut() + .unwrap() + .entry(self.node.id()) + .or_default(); builder.add_statement(parse_quote! { #ident = cross_join::<'tick, 'tick>() -> tee(); @@ -273,13 +310,13 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, + node: self.node.clone(), graph: self.graph, _phantom: PhantomData, } } - pub fn union(&self, other: &HfStream) -> HfStream<'a, T> { + pub fn union(&self, other: &HfStream<'a, T, N>) -> HfStream<'a, T, N> { let next_id = { let mut next_id = self.graph.next_id.borrow_mut(); let id = *next_id; @@ -292,7 +329,11 @@ impl<'a, T> HfStream<'a, T> { let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); let mut builders = self.graph.builders.borrow_mut(); - let builder = builders.as_mut().unwrap().entry(self.node_id).or_default(); + let builder = builders + .as_mut() + .unwrap() + .entry(self.node.id()) + .or_default(); builder.add_statement(parse_quote! { #ident = union() -> tee(); @@ -308,7 +349,7 @@ impl<'a, T> HfStream<'a, T> { HfStream { ident, - node_id: self.node_id, + node: self.node.clone(), graph: self.graph, _phantom: PhantomData, } @@ -331,7 +372,7 @@ impl<'a, T> HfStream<'a, T> { .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #ident = #self_ident -> for_each(#f); @@ -347,7 +388,7 @@ impl<'a, T> HfStream<'a, T> { .borrow_mut() .as_mut() .unwrap() - .entry(self.node_id) + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #self_ident -> dest_sink(#sink); @@ -355,16 +396,17 @@ impl<'a, T> HfStream<'a, T> { } } -impl<'a> HfStream<'a, Bytes> { - pub fn send_to( +impl<'a, N: HfNode<'a>> HfStream<'a, Bytes, N> { + pub fn send_to>( &self, - other: usize, - port_name: &str, - cli: RuntimeData<&'a HydroCLI>, - ) -> HfStream<'a, Result> { + other: &N2, + ) -> HfStream<'a, Result, N2> + where + N: HfConnectable<'a, N2>, + { let self_ident = &self.ident; - let cli_splice = cli.splice(); + let self_cli_splice = self.node.get_cli().splice(); let hydroflow_crate = proc_macro_crate::crate_name("hydroflow_plus") .expect("hydroflow_plus should be present in `Cargo.toml`"); @@ -376,18 +418,19 @@ impl<'a> HfStream<'a, Bytes> { } }; - self.graph - .builders - .borrow_mut() - .as_mut() - .unwrap() - .entry(self.node_id) + let mut builders_borrowed = self.graph.builders.borrow_mut(); + let builders = builders_borrowed.as_mut().unwrap(); + + let source_name = self.node.next_port(); + + builders + .entry(self.node.id()) .or_default() .add_statement(parse_quote! { #self_ident -> dest_sink({ use #root::util::cli::ConnectedSink; - #cli_splice - .port(#port_name) + #self_cli_splice + .port(#source_name) .connect_local_blocking::<#root::util::cli::ConnectedDirect>() .into_sink() }); @@ -400,36 +443,38 @@ impl<'a> HfStream<'a, Bytes> { id }; + let recipient_cli_splice = other.get_cli().splice(); + let ident = syn::Ident::new(&format!("stream_{}", recipient_next_id), Span::call_site()); - self.graph - .builders - .borrow_mut() - .as_mut() - .unwrap() - .entry(other) + let recipient_port_name = other.next_port(); + + builders + .entry(other.id()) .or_default() .add_statement(parse_quote! { #ident = source_stream({ use #root::util::cli::ConnectedSource; - #cli_splice - .port(#port_name) + #recipient_cli_splice + .port(#recipient_port_name) .connect_local_blocking::<#root::util::cli::ConnectedDirect>() .into_source() }) -> tee(); }); + self.node.connect(other, &source_name, &recipient_port_name); + HfStream { ident, - node_id: other, + node: other.clone(), graph: self.graph, _phantom: PhantomData, } } } -impl<'a, K, V1> HfStream<'a, (K, V1)> { - pub fn join(&self, n: &HfStream<(K, V2)>) -> HfStream<'a, (K, (V1, V2))> +impl<'a, K, V1, N: HfNode<'a>> HfStream<'a, (K, V1), N> { + pub fn join(&self, n: &HfStream<'a, (K, V2), N>) -> HfStream<'a, (K, (V1, V2)), N> where K: Eq + Hash, { @@ -445,7 +490,11 @@ impl<'a, K, V1> HfStream<'a, (K, V1)> { let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site()); let mut builders = self.graph.builders.borrow_mut(); - let builder = builders.as_mut().unwrap().entry(self.node_id).or_default(); + let builder = builders + .as_mut() + .unwrap() + .entry(self.node.id()) + .or_default(); builder.add_statement(parse_quote! { #ident = join::<'tick, 'tick>() -> tee(); @@ -461,7 +510,7 @@ impl<'a, K, V1> HfStream<'a, (K, V1)> { HfStream { ident, - node_id: self.node_id, + node: self.node.clone(), graph: self.graph, _phantom: PhantomData, } diff --git a/hydroflow_plus_cli_integration/Cargo.toml b/hydroflow_plus_cli_integration/Cargo.toml new file mode 100644 index 000000000000..3f5361067a99 --- /dev/null +++ b/hydroflow_plus_cli_integration/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "hydroflow_plus_cli_integration" +publish = false +version = "0.5.0" +edition = "2021" + +[dependencies] +stageleft = { path = "../stageleft", version = "^0.1.0" } +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" } +hydro_cli = { path = "../hydro_cli", version = "^0.5.0" } +tokio = { version = "1.16", features = [ "full" ] } +async-channel = "1.8.0" diff --git a/hydroflow_plus_cli_integration/src/lib.rs b/hydroflow_plus_cli_integration/src/lib.rs new file mode 100644 index 000000000000..ebaf30016821 --- /dev/null +++ b/hydroflow_plus_cli_integration/src/lib.rs @@ -0,0 +1,113 @@ +use std::cell::RefCell; +use std::rc::Rc; +use std::sync::Arc; + +use async_channel::Receiver; +use hydro_cli::core::custom_service::CustomClientPort; +use hydro_cli::core::hydroflow_crate::ports::HydroflowSource; +use hydro_cli::core::{Deployment, Host, HydroflowCrate}; +use hydroflow_plus::node::{HFDeploy, HFNodeBuilder, HfConnectable, HfNode}; +use hydroflow_plus::HfBuilder; +use stageleft::RuntimeData; +use tokio::sync::RwLock; + +#[derive(Clone)] +pub struct CLIDeployNode { + id: usize, + next_port: Rc>, + underlying: Arc>, +} + +impl CLIDeployNode { + pub fn new(id: usize, underlying: Arc>) -> Self { + Self { + id, + next_port: Rc::new(RefCell::new(0)), + underlying, + } + } + + pub async fn create_sender( + &self, + port: &str, + deployment: &mut Deployment, + on: &Arc>, + ) -> CustomClientPort { + let sender_service = deployment.CustomService(on.clone(), vec![]); + let mut sender_port = sender_service.read().await.create_port(&sender_service); + let mut recipient = self + .underlying + .read() + .await + .get_port(port.to_string(), &self.underlying); + + sender_port.send_to(&mut recipient); + sender_port + } + + pub async fn stdout(&self) -> Receiver { + self.underlying.read().await.stdout().await + } + + pub async fn stderr(&self) -> Receiver { + self.underlying.read().await.stderr().await + } +} + +impl<'a> HfNode<'a> for CLIDeployNode { + fn id(&self) -> usize { + self.id + } + + fn next_port(&self) -> String { + let next_port = *self.next_port.borrow(); + *self.next_port.borrow_mut() += 1; + format!("port_{}", next_port) + } + + fn get_cli(&self) -> RuntimeData<&'a hydroflow_plus::util::cli::HydroCLI> { + Default::default() + } +} + +impl<'a> HfConnectable<'a, CLIDeployNode> for CLIDeployNode { + fn connect(&self, other: &CLIDeployNode, source_port: &str, recipient_port: &str) { + let mut source_port = self + .underlying + .try_read() + .unwrap() + .get_port(source_port.to_string(), &self.underlying); + + let mut recipient_port = other + .underlying + .try_read() + .unwrap() + .get_port(recipient_port.to_string(), &other.underlying); + + source_port.send_to(&mut recipient_port); + } +} + +pub struct CLIDeployNodeBuilder<'a>(Box Arc> + 'a>); + +impl<'a> CLIDeployNodeBuilder<'a> { + pub fn new Arc> + 'a>(f: F) -> Self { + Self(Box::new(f)) + } +} + +impl<'a, 'b> HFNodeBuilder<'a, CLIDeployNode> for CLIDeployNodeBuilder<'b> { + fn build(&mut self, builder: &'a HfBuilder<'a>) -> CLIDeployNode { + let id = builder.next_node_id(); + CLIDeployNode::new(id, (self.0)(id)) + } +} + +pub struct CLIDeploy<'b> { + _marker: std::marker::PhantomData<&'b ()>, +} + +impl<'a, 'b> HFDeploy<'a> for CLIDeploy<'b> { + type Node = CLIDeployNode; + type NodeBuilder = CLIDeployNodeBuilder<'b>; +} diff --git a/hydroflow_plus_test/Cargo.toml b/hydroflow_plus_test/Cargo.toml index c5e701fbce09..6026a3e5cb44 100644 --- a/hydroflow_plus_test/Cargo.toml +++ b/hydroflow_plus_test/Cargo.toml @@ -18,3 +18,5 @@ stageleft_tool = { path = "../stageleft_tool", version = "^0.1.0" } [dev-dependencies] insta = "1.7.1" +hydro_cli = { path = "../hydro_cli", version = "^0.5.0" } +hydroflow_plus_cli_integration = { path = "../hydroflow_plus_cli_integration", version = "^0.5.0" } diff --git a/hydroflow_plus_test/examples/networked_basic.rs b/hydroflow_plus_test/examples/networked_basic.rs index 1f18b163b070..3d94ec561822 100644 --- a/hydroflow_plus_test/examples/networked_basic.rs +++ b/hydroflow_plus_test/examples/networked_basic.rs @@ -6,7 +6,7 @@ async fn main() { let node_id: usize = std::env::args().nth(1).unwrap().parse().unwrap(); let ports = hydroflow::util::cli::init().await; - let joined = hydroflow_plus_test::networked::networked_basic!(&ports, node_id); + let joined = hydroflow_plus_test::networked::networked_basic_runtime!(&ports, node_id); hydroflow::util::cli::launch_flow(joined).await; } diff --git a/hydroflow_plus_test/examples/networked_basic_deploy.rs b/hydroflow_plus_test/examples/networked_basic_deploy.rs new file mode 100644 index 000000000000..51b4f67aa5a1 --- /dev/null +++ b/hydroflow_plus_test/examples/networked_basic_deploy.rs @@ -0,0 +1,42 @@ +use hydro_cli::core::Deployment; +use hydroflow::futures::SinkExt; +use hydroflow::util::cli::ConnectedSink; +use hydroflow_plus_cli_integration::{CLIDeploy, CLIDeployNodeBuilder}; + +#[tokio::main] +async fn main() { + let mut deployment = Deployment::new(); + let localhost = deployment.Localhost(); + + let builder = hydroflow_plus::HfBuilder::new(); + let (node_zero, _) = hydroflow_plus_test::networked::networked_basic::( + &builder, + &mut CLIDeployNodeBuilder::new(|id| { + deployment.HydroflowCrate( + ".", + localhost.clone(), + None, + Some("networked_basic".into()), + Some("dev".into()), + None, + Some(vec![id.to_string()]), + None, + vec![], + ) + }), + ); + + let port_to_zero = node_zero + .create_sender("node_zero_input", &mut deployment, &localhost) + .await; + + deployment.deploy().await.unwrap(); + + let mut conn_to_zero = port_to_zero.connect().await.into_sink(); + + deployment.start().await.unwrap(); + + for line in std::io::stdin().lines() { + conn_to_zero.send(line.unwrap().into()).await.unwrap(); + } +} diff --git a/hydroflow_plus_test/python_tests/basic.py b/hydroflow_plus_test/python_tests/basic.py index 2061a37516ab..7519ed309b15 100644 --- a/hydroflow_plus_test/python_tests/basic.py +++ b/hydroflow_plus_test/python_tests/basic.py @@ -33,7 +33,7 @@ async def test_networked_basic(): sender_port = sender.client_port() sender_port.send_to(program_zero.ports.node_zero_input) - program_zero.ports.zero_to_one.send_to(program_one.ports.zero_to_one) + program_zero.ports.port_0.send_to(program_one.ports.port_0) await deployment.deploy() @@ -45,4 +45,6 @@ async def test_networked_basic(): async for log in receiver_out: assert log == "node one received: \"hi!\"" - break + return + + assert False diff --git a/hydroflow_plus_test/src/lib.rs b/hydroflow_plus_test/src/lib.rs index d532634f1de4..07d01aadfc02 100644 --- a/hydroflow_plus_test/src/lib.rs +++ b/hydroflow_plus_test/src/lib.rs @@ -17,7 +17,7 @@ pub fn teed_join<'a, S: Stream + Unpin + 'a>( send_twice: bool, node_id: RuntimeData, ) -> impl Quoted<'a, Hydroflow<'a>> { - let source = graph.source_stream(0, input_stream); + let source = graph.source_stream(&0, input_stream); let map1 = source.map(q!(|v| (v + 1, ()))); let map2 = source.map(q!(|v| (v - 1, ()))); @@ -33,7 +33,7 @@ pub fn teed_join<'a, S: Stream + Unpin + 'a>( })); } - let source_node_id_1 = graph.source_iter(1, q!(0..5)); + let source_node_id_1 = graph.source_iter(&1, q!(0..5)); source_node_id_1.for_each(q!(|v| { output.send(v).unwrap(); })); @@ -49,8 +49,8 @@ pub fn chat_app<'a>( output: RuntimeData<&'a UnboundedSender<(u32, String)>>, replay_messages: bool, ) -> impl Quoted<'a, Hydroflow<'a>> { - let users = graph.source_stream(0, users_stream).persist(); - let mut messages = graph.source_stream(0, messages); + let users = graph.source_stream(&(), users_stream).persist(); + let mut messages = graph.source_stream(&(), messages); if replay_messages { messages = messages.persist(); } @@ -74,10 +74,10 @@ pub fn graph_reachability<'a>( edges: RuntimeData>, reached_out: RuntimeData<&'a UnboundedSender>, ) -> impl Quoted<'a, Hydroflow<'a>> { - let roots = graph.source_stream(0, roots); - let edges = graph.source_stream(0, edges); + let roots = graph.source_stream(&(), roots); + let edges = graph.source_stream(&(), edges); - let (set_reached_cycle, reached_cycle) = graph.cycle(0); + let (set_reached_cycle, reached_cycle) = graph.cycle(&()); let reached = roots.union(&reached_cycle); let reachable = reached @@ -99,7 +99,7 @@ pub fn count_elems<'a, T: 'a>( input_stream: RuntimeData>, output: RuntimeData<&'a UnboundedSender>, ) -> impl Quoted<'a, Hydroflow<'a>> { - let source = graph.source_stream(0, input_stream); + let source = graph.source_stream(&(), input_stream); let count = source.map(q!(|_| 1)).fold(q!(|| 0), q!(|a, b| *a += b)); count.for_each(q!(|v| { diff --git a/hydroflow_plus_test/src/networked.rs b/hydroflow_plus_test/src/networked.rs index 454a2f2b7398..8469fc8b723f 100644 --- a/hydroflow_plus_test/src/networked.rs +++ b/hydroflow_plus_test/src/networked.rs @@ -1,27 +1,22 @@ use hydroflow::bytes::BytesMut; -use hydroflow::util::cli::{ConnectedDirect, ConnectedSource, HydroCLI}; +use hydroflow::util::cli::HydroCLI; +use hydroflow_plus::node::{CLIRuntime, CLIRuntimeNodeBuilder, HFDeploy, HFNodeBuilder}; use hydroflow_plus::scheduled::graph::Hydroflow; use hydroflow_plus::HfBuilder; use stageleft::{q, Quoted, RuntimeData}; -#[stageleft::entry] -pub fn networked_basic<'a>( +pub fn networked_basic<'a, D: HFDeploy<'a>>( graph: &'a HfBuilder<'a>, - cli: RuntimeData<&'a HydroCLI>, - node_id: RuntimeData, -) -> impl Quoted<'a, Hydroflow<'a>> { - let source_zero = graph.source_stream( - 0, - q!({ - cli.port("node_zero_input") - .connect_local_blocking::() - .into_source() - }), - ); + node_builder: &mut D::NodeBuilder, +) -> (D::Node, D::Node) { + let node_zero = node_builder.build(graph); + let node_one = node_builder.build(graph); + + let source_zero = graph.source_port(&node_zero, "node_zero_input"); source_zero .map(q!(|v| v.unwrap().freeze())) - .send_to(1, "zero_to_one", cli) + .send_to(&node_one) .for_each(q!(|v: Result| { println!( "node one received: {:?}", @@ -29,5 +24,72 @@ pub fn networked_basic<'a>( ); })); + (node_zero, node_one) +} + +#[stageleft::entry] +pub fn networked_basic_runtime<'a>( + graph: &'a HfBuilder<'a>, + cli: RuntimeData<&'a HydroCLI>, + node_id: RuntimeData, +) -> impl Quoted<'a, Hydroflow<'a>> { + let mut node_zero = CLIRuntimeNodeBuilder::new(cli); + let _ = networked_basic::(graph, &mut node_zero); graph.build(node_id) } + +#[stageleft::runtime] +#[cfg(test)] +mod tests { + use std::time::Duration; + + use hydro_cli::core::Deployment; + use hydroflow::futures::SinkExt; + use hydroflow::util::cli::ConnectedSink; + use hydroflow_plus_cli_integration::{CLIDeploy, CLIDeployNodeBuilder}; + + #[tokio::test] + async fn networked_basic() { + let mut deployment = Deployment::new(); + let localhost = deployment.Localhost(); + + let builder = hydroflow_plus::HfBuilder::new(); + let (node_zero, node_one) = super::networked_basic::( + &builder, + &mut CLIDeployNodeBuilder::new(|id| { + deployment.HydroflowCrate( + ".", + localhost.clone(), + None, + Some("networked_basic".into()), + Some("dev".into()), + None, + Some(vec![id.to_string()]), + None, + vec![], + ) + }), + ); + + let port_to_zero = node_zero + .create_sender("node_zero_input", &mut deployment, &localhost) + .await; + + deployment.deploy().await.unwrap(); + + let mut conn_to_zero = port_to_zero.connect().await.into_sink(); + let node_one_stdout = node_one.stdout().await; + + deployment.start().await.unwrap(); + + conn_to_zero.send("hello world!".into()).await.unwrap(); + + assert_eq!( + tokio::time::timeout(Duration::from_secs(1), node_one_stdout.recv()) + .await + .unwrap() + .unwrap(), + "node one received: \"hello world!\"" + ); + } +} diff --git a/stageleft/src/lib.rs b/stageleft/src/lib.rs index 28fbcca36115..6da2034373ef 100644 --- a/stageleft/src/lib.rs +++ b/stageleft/src/lib.rs @@ -144,7 +144,7 @@ impl< segments: syn::punctuated::Punctuated::from_iter(module_path), }; - let expr: syn::Expr = syn::parse(expr_tokens.into()).unwrap(); + let expr: syn::Expr = syn::parse2(expr_tokens).unwrap(); ( None, Some(quote!({ @@ -195,6 +195,12 @@ impl RuntimeData { } } +impl Default for RuntimeData { + fn default() -> Self { + RuntimeData::new("_") + } +} + impl FreeVariable for RuntimeData { fn to_tokens(self) -> (Option, Option) { let ident = syn::Ident::new(self.ident, Span::call_site()); diff --git a/stageleft_tool/src/lib.rs b/stageleft_tool/src/lib.rs index 35776fc64a59..20501b97bceb 100644 --- a/stageleft_tool/src/lib.rs +++ b/stageleft_tool/src/lib.rs @@ -221,6 +221,6 @@ pub fn gen_final_helper(final_crate: &str) { #[macro_export] macro_rules! gen_final { () => { - $crate::gen_final_helper(env!("CARGO_CRATE_NAME")) + $crate::gen_final_helper(env!("CARGO_PKG_NAME")) }; }