From 8a809315cd37929687fcabc34a12042db25d5767 Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Wed, 11 Sep 2024 11:38:23 -0700 Subject: [PATCH] feat(hydroflow_plus): add API for external network inputs (#1449) This is a key step towards being able to unit-test HF+ graphs, by being able to have controlled inputs. Outputs next. --- hydro_deploy/core/src/deployment.rs | 18 + .../hydroflow_plus_deploy/src/deploy.rs | 303 ++++++++----- .../src/deploy_runtime.rs | 13 + .../hydroflow_plus_deploy/src/runtime.rs | 116 ++++- .../hydroflow_plus_deploy/src/trybuild.rs | 1 + hydroflow_plus/src/builder/built.rs | 13 +- hydroflow_plus/src/builder/deploy.rs | 92 +++- hydroflow_plus/src/builder/mod.rs | 100 ++++- hydroflow_plus/src/deploy/graphs.rs | 2 + hydroflow_plus/src/deploy/mod.rs | 87 +++- hydroflow_plus/src/ir.rs | 401 ++++++++++-------- hydroflow_plus/src/location.rs | 29 ++ hydroflow_plus/src/persist_pullup.rs | 4 + hydroflow_plus/src/stream.rs | 6 +- .../examples/first_ten_distributed.rs | 25 +- ...ter__compute_pi__tests__compute_pi_ir.snap | 2 + ...er__many_to_many__tests__many_to_many.snap | 2 + ...ter__map_reduce__tests__map_reduce_ir.snap | 4 + ...cluster__paxos_bench__tests__paxos_ir.snap | 84 ++++ ...simple_cluster__tests__simple_cluster.snap | 4 + .../src/distributed/first_ten.rs | 43 +- ...rst_ten__tests__first_ten_distributed.snap | 41 +- stageleft/src/type_name.rs | 17 + 23 files changed, 1062 insertions(+), 345 deletions(-) diff --git a/hydro_deploy/core/src/deployment.rs b/hydro_deploy/core/src/deployment.rs index 6eba1b18e1e7..faec27e64bf5 100644 --- a/hydro_deploy/core/src/deployment.rs +++ b/hydro_deploy/core/src/deployment.rs @@ -69,11 +69,29 @@ impl Deployment { Ok(()) } + /// Runs `start()`, waits for the trigger future, then runs `stop()`. + /// This is useful if you need to initiate external network connections between + /// `deploy()` and `start()`. + pub async fn start_until(&mut self, trigger: impl Future) -> Result<()> { + // TODO(mingwei): should `trigger` interrupt `deploy()` and `start()`? If so make sure shutdown works as expected. + self.start().await?; + trigger.await; + self.stop().await?; + Ok(()) + } + /// Runs `deploy()`, and `start()`, waits for CTRL+C, then runs `stop()`. pub async fn run_ctrl_c(&mut self) -> Result<()> { self.run_until(tokio::signal::ctrl_c().map(|_| ())).await } + /// Runs `start()`, waits for CTRL+C, then runs `stop()`. + /// This is useful if you need to initiate external network connections between + /// `deploy()` and `start()`. + pub async fn start_ctrl_c(&mut self) -> Result<()> { + self.start_until(tokio::signal::ctrl_c().map(|_| ())).await + } + pub async fn deploy(&mut self) -> Result<()> { self.services.retain(|weak| weak.strong_count() > 0); diff --git a/hydro_deploy/hydroflow_plus_deploy/src/deploy.rs b/hydro_deploy/hydroflow_plus_deploy/src/deploy.rs index 8ed33fcf2f94..266a37bec892 100644 --- a/hydro_deploy/hydroflow_plus_deploy/src/deploy.rs +++ b/hydro_deploy/hydroflow_plus_deploy/src/deploy.rs @@ -1,5 +1,8 @@ use std::cell::RefCell; use std::collections::HashMap; +use std::future::Future; +use std::io::Error; +use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; @@ -9,10 +12,13 @@ use hydro_deploy::hydroflow_crate::ports::{ }; use hydro_deploy::hydroflow_crate::tracing_options::TracingOptions; use hydro_deploy::hydroflow_crate::HydroflowCrateService; -use hydro_deploy::{Deployment, Host, HydroflowCrate}; -use hydroflow_plus::deploy::{ClusterSpec, Deploy, Node, ProcessSpec}; +use hydro_deploy::{CustomService, Deployment, Host, HydroflowCrate}; +use hydroflow_plus::deploy::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort}; +use hydroflow_plus::futures::SinkExt; use hydroflow_plus::lang::graph::HydroflowGraph; +use hydroflow_plus::util::deploy::ConnectedSink; use nameof::name_of; +use serde::Serialize; use sha2::{Digest, Sha256}; use stageleft::{Quoted, RuntimeData}; use tokio::sync::RwLock; @@ -28,28 +34,33 @@ impl<'a> Deploy<'a> for HydroDeploy { type CompileEnv = (); type Process = DeployNode; type Cluster = DeployCluster; + type ExternalProcess = DeployExternal; type Meta = HashMap>; type GraphId = (); - type ProcessPort = DeployPort; - type ClusterPort = DeployPort; + type Port = String; + type ExternalRawPort = CustomClientPort; - fn allocate_process_port(process: &Self::Process) -> Self::ProcessPort { + fn allocate_process_port(process: &Self::Process) -> Self::Port { process.next_port() } - fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::ClusterPort { + fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port { cluster.next_port() } + fn allocate_external_port(external: &Self::ExternalProcess) -> Self::Port { + external.next_port() + } + fn o2o_sink_source( _env: &(), _p1: &Self::Process, - p1_port: &Self::ProcessPort, + p1_port: &Self::Port, _p2: &Self::Process, - p2_port: &Self::ProcessPort, + p2_port: &Self::Port, ) -> (syn::Expr, syn::Expr) { - let p1_port = p1_port.port.as_str(); - let p2_port = p2_port.port.as_str(); + let p1_port = p1_port.as_str(); + let p2_port = p2_port.as_str(); deploy_o2o( RuntimeData::new("__hydroflow_plus_trybuild_cli"), p1_port, @@ -59,23 +70,23 @@ impl<'a> Deploy<'a> for HydroDeploy { fn o2o_connect( p1: &Self::Process, - p1_port: &Self::ProcessPort, + p1_port: &Self::Port, p2: &Self::Process, - p2_port: &Self::ProcessPort, + p2_port: &Self::Port, ) { let self_underlying_borrow = p1.underlying.borrow(); let self_underlying = self_underlying_borrow.as_ref().unwrap(); let source_port = self_underlying .try_read() .unwrap() - .get_port(p1_port.port.clone(), self_underlying); + .get_port(p1_port.clone(), self_underlying); let other_underlying_borrow = p2.underlying.borrow(); let other_underlying = other_underlying_borrow.as_ref().unwrap(); let recipient_port = other_underlying .try_read() .unwrap() - .get_port(p2_port.port.clone(), other_underlying); + .get_port(p2_port.clone(), other_underlying); source_port.send_to(&recipient_port); } @@ -83,12 +94,12 @@ impl<'a> Deploy<'a> for HydroDeploy { fn o2m_sink_source( _env: &(), _p1: &Self::Process, - p1_port: &Self::ProcessPort, + p1_port: &Self::Port, _c2: &Self::Cluster, - c2_port: &Self::ClusterPort, + c2_port: &Self::Port, ) -> (syn::Expr, syn::Expr) { - let p1_port = p1_port.port.as_str(); - let c2_port = c2_port.port.as_str(); + let p1_port = p1_port.as_str(); + let c2_port = c2_port.as_str(); deploy_o2m( RuntimeData::new("__hydroflow_plus_trybuild_cli"), p1_port, @@ -98,16 +109,16 @@ impl<'a> Deploy<'a> for HydroDeploy { fn o2m_connect( p1: &Self::Process, - p1_port: &Self::ProcessPort, + p1_port: &Self::Port, c2: &Self::Cluster, - c2_port: &Self::ClusterPort, + c2_port: &Self::Port, ) { let self_underlying_borrow = p1.underlying.borrow(); let self_underlying = self_underlying_borrow.as_ref().unwrap(); let source_port = self_underlying .try_read() .unwrap() - .get_port(p1_port.port.clone(), self_underlying); + .get_port(p1_port.clone(), self_underlying); let recipient_port = DemuxSink { demux: c2 @@ -119,7 +130,7 @@ impl<'a> Deploy<'a> for HydroDeploy { let n = c.underlying.try_read().unwrap(); ( id as u32, - Arc::new(n.get_port(c2_port.port.clone(), &c.underlying)) + Arc::new(n.get_port(c2_port.clone(), &c.underlying)) as Arc, ) }) @@ -132,12 +143,12 @@ impl<'a> Deploy<'a> for HydroDeploy { fn m2o_sink_source( _env: &(), _c1: &Self::Cluster, - c1_port: &Self::ClusterPort, + c1_port: &Self::Port, _p2: &Self::Process, - p2_port: &Self::ProcessPort, + p2_port: &Self::Port, ) -> (syn::Expr, syn::Expr) { - let c1_port = c1_port.port.as_str(); - let p2_port = p2_port.port.as_str(); + let c1_port = c1_port.as_str(); + let p2_port = p2_port.as_str(); deploy_m2o( RuntimeData::new("__hydroflow_plus_trybuild_cli"), c1_port, @@ -147,16 +158,16 @@ impl<'a> Deploy<'a> for HydroDeploy { fn m2o_connect( c1: &Self::Cluster, - c1_port: &Self::ClusterPort, + c1_port: &Self::Port, p2: &Self::Process, - p2_port: &Self::ProcessPort, + p2_port: &Self::Port, ) { let other_underlying_borrow = p2.underlying.borrow(); let other_underlying = other_underlying_borrow.as_ref().unwrap(); let recipient_port = other_underlying .try_read() .unwrap() - .get_port(p2_port.port.clone(), other_underlying) + .get_port(p2_port.clone(), other_underlying) .merge(); for (i, node) in c1.members.borrow().iter().enumerate() { @@ -164,7 +175,7 @@ impl<'a> Deploy<'a> for HydroDeploy { .underlying .try_read() .unwrap() - .get_port(c1_port.port.clone(), &node.underlying); + .get_port(c1_port.clone(), &node.underlying); TaggedSource { source: Arc::new(source_port), @@ -177,12 +188,12 @@ impl<'a> Deploy<'a> for HydroDeploy { fn m2m_sink_source( _env: &(), _c1: &Self::Cluster, - c1_port: &Self::ClusterPort, + c1_port: &Self::Port, _c2: &Self::Cluster, - c2_port: &Self::ClusterPort, + c2_port: &Self::Port, ) -> (syn::Expr, syn::Expr) { - let c1_port = c1_port.port.as_str(); - let c2_port = c2_port.port.as_str(); + let c1_port = c1_port.as_str(); + let c2_port = c2_port.as_str(); deploy_m2m( RuntimeData::new("__hydroflow_plus_trybuild_cli"), c1_port, @@ -192,16 +203,16 @@ impl<'a> Deploy<'a> for HydroDeploy { fn m2m_connect( c1: &Self::Cluster, - c1_port: &Self::ClusterPort, + c1_port: &Self::Port, c2: &Self::Cluster, - c2_port: &Self::ClusterPort, + c2_port: &Self::Port, ) { for (i, sender) in c1.members.borrow().iter().enumerate() { let source_port = sender .underlying .try_read() .unwrap() - .get_port(c1_port.port.clone(), &sender.underlying); + .get_port(c1_port.clone(), &sender.underlying); let recipient_port = DemuxSink { demux: c2 @@ -213,7 +224,7 @@ impl<'a> Deploy<'a> for HydroDeploy { let n = c.underlying.try_read().unwrap(); ( id as u32, - Arc::new(n.get_port(c2_port.port.clone(), &c.underlying).merge()) + Arc::new(n.get_port(c2_port.clone(), &c.underlying).merge()) as Arc, ) }) @@ -228,6 +239,49 @@ impl<'a> Deploy<'a> for HydroDeploy { } } + fn e2o_source( + _compile_env: &Self::CompileEnv, + _p1: &Self::ExternalProcess, + p1_port: &Self::Port, + _p2: &Self::Process, + p2_port: &Self::Port, + ) -> syn::Expr { + let p1_port = p1_port.as_str(); + let p2_port = p2_port.as_str(); + deploy_e2o( + RuntimeData::new("__hydroflow_plus_trybuild_cli"), + p1_port, + p2_port, + ) + } + + fn e2o_connect( + p1: &Self::ExternalProcess, + p1_port: &Self::Port, + p2: &Self::Process, + p2_port: &Self::Port, + ) { + let self_underlying_borrow = p1.underlying.borrow(); + let self_underlying = self_underlying_borrow.as_ref().unwrap(); + let source_port = self_underlying + .try_read() + .unwrap() + .declare_client(self_underlying); + + let other_underlying_borrow = p2.underlying.borrow(); + let other_underlying = other_underlying_borrow.as_ref().unwrap(); + let recipient_port = other_underlying + .try_read() + .unwrap() + .get_port(p2_port.clone(), other_underlying); + + source_port.send_to(&recipient_port); + + p1.client_ports + .borrow_mut() + .insert(p1_port.clone(), source_port); + } + fn cluster_ids( _env: &Self::CompileEnv, of_cluster: usize, @@ -246,25 +300,6 @@ impl<'a> Deploy<'a> for HydroDeploy { pub trait DeployCrateWrapper { fn underlying(&self) -> Arc>; - #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")] - async fn create_sender( - &self, - port: &str, - deployment: &mut Deployment, - on: &Arc, - ) -> CustomClientPort { - let sender_service = deployment.CustomService(on.clone(), vec![]); - let sender_port = sender_service.read().await.declare_client(&sender_service); - let recipient = self - .underlying() - .read() - .await - .get_port(port.to_string(), &self.underlying()); - - sender_port.send_to(&recipient); - sender_port - } - #[expect(async_fn_in_trait, reason = "no auto trait bounds needed")] async fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver { self.underlying().read().await.stdout() @@ -345,6 +380,112 @@ impl From> for TrybuildHost { } } +#[derive(Clone)] +pub struct DeployExternal { + next_port: Rc>, + host: Arc, + underlying: Rc>>>>, + client_ports: Rc>>, + allocated_ports: Rc>>, +} + +impl DeployExternal { + pub fn take_port(&self, key: usize) -> CustomClientPort { + self.client_ports + .borrow_mut() + .remove(self.allocated_ports.borrow().get(&key).unwrap()) + .unwrap() + } +} + +impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal { + fn register(&self, key: usize, port: ::Port) { + self.allocated_ports.borrow_mut().insert(key, port); + } + + fn raw_port(&self, key: usize) -> ::ExternalRawPort { + self.client_ports + .borrow_mut() + .remove(self.allocated_ports.borrow().get(&key).unwrap()) + .unwrap() + } + + fn as_bytes_sink( + &self, + key: usize, + ) -> impl Future< + Output = Pin< + Box>, + >, + > + 'a { + let port = self.raw_port(key); + async move { + let sink = port.connect().await.into_sink(); + Box::pin(sink) + as Pin< + Box< + dyn hydroflow_plus::futures::Sink< + hydroflow_plus::bytes::Bytes, + Error = Error, + >, + >, + > + } + } + + fn as_bincode_sink( + &self, + key: usize, + ) -> impl Future>>> + 'a + { + let port = self.raw_port(key); + async move { + let sink = port.connect().await.into_sink(); + Box::pin(sink.with(|item| async move { + Ok(hydroflow_plus::bincode::serialize(&item).unwrap().into()) + })) as Pin>> + } + } +} + +impl Node for DeployExternal { + type Port = String; + type Meta = HashMap>; + type InstantiateEnv = Deployment; + + fn next_port(&self) -> Self::Port { + let next_port = *self.next_port.borrow(); + *self.next_port.borrow_mut() += 1; + + format!("port_{}", next_port) + } + + fn instantiate( + &self, + env: &mut Self::InstantiateEnv, + _meta: &mut Self::Meta, + _graph: HydroflowGraph, + _extra_stmts: Vec, + ) { + let service = env.CustomService(self.host.clone(), vec![]); + *self.underlying.borrow_mut() = Some(service); + } + + fn update_meta(&mut self, _meta: &Self::Meta) {} +} + +impl<'a> ExternalSpec<'a, HydroDeploy> for Arc { + fn build(self, _id: usize, _name_hint: &str) -> DeployExternal { + DeployExternal { + next_port: Rc::new(RefCell::new(0)), + host: self, + underlying: Rc::new(RefCell::new(None)), + allocated_ports: Rc::new(RefCell::new(HashMap::new())), + client_ports: Rc::new(RefCell::new(HashMap::new())), + } + } +} + pub enum CrateOrTrybuild { Crate(HydroflowCrate), Trybuild(TrybuildHost), @@ -364,49 +505,16 @@ impl DeployCrateWrapper for DeployNode { } } -pub struct DeployPort { - node: N, - port: String, -} - -impl DeployPort { - pub async fn create_sender( - &self, - deployment: &mut Deployment, - on: &Arc, - ) -> CustomClientPort { - self.node.create_sender(&self.port, deployment, on).await - } -} - -impl DeployPort { - pub async fn create_senders( - &self, - deployment: &mut Deployment, - on: &Arc, - ) -> Vec { - let mut out = vec![]; - for member in self.node.members() { - out.push(member.create_sender(&self.port, deployment, on).await); - } - - out - } -} - impl Node for DeployNode { - type Port = DeployPort; + type Port = String; type Meta = HashMap>; type InstantiateEnv = Deployment; - fn next_port(&self) -> DeployPort { + fn next_port(&self) -> String { let next_port = *self.next_port.borrow(); *self.next_port.borrow_mut() += 1; - DeployPort { - node: self.clone(), - port: format!("port_{}", next_port), - } + format!("port_{}", next_port) } fn update_meta(&mut self, meta: &Self::Meta) { @@ -466,18 +574,15 @@ impl DeployCluster { } impl Node for DeployCluster { - type Port = DeployPort; + type Port = String; type Meta = HashMap>; type InstantiateEnv = Deployment; - fn next_port(&self) -> DeployPort { + fn next_port(&self) -> String { let next_port = *self.next_port.borrow(); *self.next_port.borrow_mut() += 1; - DeployPort { - node: self.clone(), - port: format!("port_{}", next_port), - } + format!("port_{}", next_port) } fn instantiate( diff --git a/hydro_deploy/hydroflow_plus_deploy/src/deploy_runtime.rs b/hydro_deploy/hydroflow_plus_deploy/src/deploy_runtime.rs index df9149667cdb..60c5ddb15c39 100644 --- a/hydro_deploy/hydroflow_plus_deploy/src/deploy_runtime.rs +++ b/hydro_deploy/hydroflow_plus_deploy/src/deploy_runtime.rs @@ -120,3 +120,16 @@ pub fn deploy_m2m( }, ) } + +pub fn deploy_e2o( + env: RuntimeData<&DeployPorts>, + c1_port: &str, + p2_port: &str, +) -> syn::Expr { + q!({ + env.port(p2_port) + .connect_local_blocking::() + .into_source() + }) + .splice_untyped() +} diff --git a/hydro_deploy/hydroflow_plus_deploy/src/runtime.rs b/hydro_deploy/hydroflow_plus_deploy/src/runtime.rs index b139660ea3df..ba761ef3e1d3 100644 --- a/hydro_deploy/hydroflow_plus_deploy/src/runtime.rs +++ b/hydro_deploy/hydroflow_plus_deploy/src/runtime.rs @@ -1,7 +1,8 @@ use std::cell::RefCell; +use std::pin::Pin; use std::rc::Rc; -use hydroflow_plus::deploy::{ClusterSpec, Deploy, Node, ProcessSpec}; +use hydroflow_plus::deploy::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort}; use hydroflow_plus::lang::graph::HydroflowGraph; use hydroflow_plus::util::deploy::DeployPorts; use stageleft::{Quoted, RuntimeData}; @@ -15,10 +16,11 @@ impl<'a> Deploy<'a> for DeployRuntime { type CompileEnv = RuntimeData<&'a DeployPorts>; type Process = DeployRuntimeNode; type Cluster = DeployRuntimeCluster; + type ExternalProcess = DeployRuntimeNode; + type Port = String; + type ExternalRawPort = (); type Meta = (); type GraphId = usize; - type ProcessPort = String; - type ClusterPort = String; fn has_trivial_node() -> bool { true @@ -36,29 +38,33 @@ impl<'a> Deploy<'a> for DeployRuntime { } } - fn allocate_process_port(process: &Self::Process) -> Self::ProcessPort { + fn allocate_process_port(process: &Self::Process) -> Self::Port { process.next_port() } - fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::ClusterPort { + fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port { cluster.next_port() } + fn allocate_external_port(_external: &Self::ExternalProcess) -> Self::Port { + panic!(); + } + fn o2o_sink_source( env: &Self::CompileEnv, _p1: &Self::Process, - p1_port: &Self::ProcessPort, + p1_port: &Self::Port, _p2: &Self::Process, - p2_port: &Self::ProcessPort, + p2_port: &Self::Port, ) -> (syn::Expr, syn::Expr) { crate::deploy_runtime::deploy_o2o(*env, p1_port.as_str(), p2_port.as_str()) } fn o2o_connect( _p1: &Self::Process, - _p1_port: &Self::ProcessPort, + _p1_port: &Self::Port, _p2: &Self::Process, - _p2_port: &Self::ProcessPort, + _p2_port: &Self::Port, ) { panic!() } @@ -66,18 +72,18 @@ impl<'a> Deploy<'a> for DeployRuntime { fn o2m_sink_source( env: &Self::CompileEnv, _p1: &Self::Process, - p1_port: &Self::ProcessPort, + p1_port: &Self::Port, _c2: &Self::Cluster, - c2_port: &Self::ClusterPort, + c2_port: &Self::Port, ) -> (syn::Expr, syn::Expr) { crate::deploy_runtime::deploy_o2m(*env, p1_port.as_str(), c2_port.as_str()) } fn o2m_connect( _p1: &Self::Process, - _p1_port: &Self::ProcessPort, + _p1_port: &Self::Port, _c2: &Self::Cluster, - _c2_port: &Self::ClusterPort, + _c2_port: &Self::Port, ) { panic!() } @@ -85,18 +91,18 @@ impl<'a> Deploy<'a> for DeployRuntime { fn m2o_sink_source( env: &Self::CompileEnv, _c1: &Self::Cluster, - c1_port: &Self::ClusterPort, + c1_port: &Self::Port, _p2: &Self::Process, - p2_port: &Self::ProcessPort, + p2_port: &Self::Port, ) -> (syn::Expr, syn::Expr) { crate::deploy_runtime::deploy_m2o(*env, c1_port.as_str(), p2_port.as_str()) } fn m2o_connect( _c1: &Self::Cluster, - _c1_port: &Self::ClusterPort, + _c1_port: &Self::Port, _p2: &Self::Process, - _p2_port: &Self::ProcessPort, + _p2_port: &Self::Port, ) { panic!() } @@ -104,18 +110,37 @@ impl<'a> Deploy<'a> for DeployRuntime { fn m2m_sink_source( env: &Self::CompileEnv, _c1: &Self::Cluster, - c1_port: &Self::ClusterPort, + c1_port: &Self::Port, _c2: &Self::Cluster, - c2_port: &Self::ClusterPort, + c2_port: &Self::Port, ) -> (syn::Expr, syn::Expr) { crate::deploy_runtime::deploy_m2m(*env, c1_port.as_str(), c2_port.as_str()) } fn m2m_connect( _c1: &Self::Cluster, - _c1_port: &Self::ClusterPort, + _c1_port: &Self::Port, _c2: &Self::Cluster, - _c2_port: &Self::ClusterPort, + _c2_port: &Self::Port, + ) { + panic!() + } + + fn e2o_source( + _compile_env: &Self::CompileEnv, + _p1: &Self::ExternalProcess, + _p1_port: &Self::Port, + _p2: &Self::Process, + _p2_port: &Self::Port, + ) -> syn::Expr { + panic!() + } + + fn e2o_connect( + _p1: &Self::ExternalProcess, + _p1_port: &Self::Port, + _p2: &Self::Process, + _p2_port: &Self::Port, ) { panic!() } @@ -137,6 +162,49 @@ pub struct DeployRuntimeNode { next_port: Rc>, } +impl<'a> RegisterPort<'a, DeployRuntime> for DeployRuntimeNode { + fn register(&self, _key: usize, _port: ::Port) { + panic!() + } + + fn raw_port(&self, _key: usize) -> ::ExternalRawPort { + panic!() + } + + #[expect( + clippy::manual_async_fn, + reason = "buggy Clippy lint for lifetime bounds" + )] + fn as_bytes_sink( + &self, + _key: usize, + ) -> impl std::future::Future< + Output = Pin< + Box< + dyn hydroflow_plus::futures::Sink< + hydroflow_plus::bytes::Bytes, + Error = std::io::Error, + >, + >, + >, + > + 'a { + async { panic!() } + } + + #[expect( + clippy::manual_async_fn, + reason = "buggy Clippy lint for lifetime bounds" + )] + fn as_bincode_sink( + &self, + _key: usize, + ) -> impl std::future::Future< + Output = Pin>>, + > + 'a { + async { panic!() } + } +} + impl Node for DeployRuntimeNode { type Port = String; type Meta = (); @@ -205,3 +273,9 @@ impl<'cli> ClusterSpec<'cli, DeployRuntime> for () { } } } + +impl<'cli> ExternalSpec<'cli, DeployRuntime> for () { + fn build(self, _id: usize, _name_hint: &str) -> DeployRuntimeNode { + panic!() + } +} diff --git a/hydro_deploy/hydroflow_plus_deploy/src/trybuild.rs b/hydro_deploy/hydroflow_plus_deploy/src/trybuild.rs index 5ec673974df2..c71f91a9d277 100644 --- a/hydro_deploy/hydroflow_plus_deploy/src/trybuild.rs +++ b/hydro_deploy/hydroflow_plus_deploy/src/trybuild.rs @@ -17,6 +17,7 @@ pub fn compile_graph_trybuild(graph: HydroflowGraph, extra_stmts: Vec let source_ast: syn::File = syn::parse_quote! { #![allow(unused_crate_dependencies, missing_docs)] + use hydroflow_plus::*; #[allow(unused)] fn __hfplus_runtime<'a>(__hydroflow_plus_trybuild_cli: &'a hydroflow_plus::util::deploy::DeployPorts) -> hydroflow_plus::Hydroflow<'a> { diff --git a/hydroflow_plus/src/builder/built.rs b/hydroflow_plus/src/builder/built.rs index b972114d5152..d0d749965507 100644 --- a/hydroflow_plus/src/builder/built.rs +++ b/hydroflow_plus/src/builder/built.rs @@ -4,9 +4,9 @@ use std::marker::PhantomData; use hydroflow_lang::graph::{eliminate_extra_unions_tees, HydroflowGraph}; use super::deploy::{DeployFlow, DeployResult}; -use crate::deploy::{ClusterSpec, Deploy, LocalDeploy, ProcessSpec}; +use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, LocalDeploy, ProcessSpec}; use crate::ir::HfPlusLeaf; -use crate::location::{Cluster, Process}; +use crate::location::{Cluster, ExternalProcess, Process}; use crate::HfCompiled; pub struct BuiltFlow<'a> { @@ -103,6 +103,7 @@ impl<'a> BuiltFlow<'a> { ir: std::mem::take(&mut self.ir), nodes: processes, clusters, + externals: HashMap::new(), used: false, _phantom: PhantomData, } @@ -116,6 +117,14 @@ impl<'a> BuiltFlow<'a> { self.into_deploy().with_process(process, spec) } + pub fn with_external>( + self, + process: &ExternalProcess

, + spec: impl ExternalSpec<'a, D>, + ) -> DeployFlow<'a, D> { + self.into_deploy().with_external(process, spec) + } + pub fn with_cluster>( self, cluster: &Cluster, diff --git a/hydroflow_plus/src/builder/deploy.rs b/hydroflow_plus/src/builder/deploy.rs index 0c46bbfd573b..4fcdd1cb342c 100644 --- a/hydroflow_plus/src/builder/deploy.rs +++ b/hydroflow_plus/src/builder/deploy.rs @@ -1,18 +1,27 @@ use std::collections::{BTreeMap, HashMap}; +use std::io::Error; use std::marker::PhantomData; +use std::pin::Pin; +use hydroflow::bytes::Bytes; +use hydroflow::futures::Sink; use proc_macro2::Span; +use serde::de::DeserializeOwned; +use serde::Serialize; use stageleft::Quoted; use super::built::build_inner; -use crate::deploy::{LocalDeploy, Node}; +use crate::deploy::{ExternalSpec, LocalDeploy, Node, RegisterPort}; use crate::ir::HfPlusLeaf; -use crate::location::{Location, LocationId}; +use crate::location::{ + ExternalBincodePort, ExternalBytesPort, ExternalProcess, Location, LocationId, +}; use crate::{Cluster, ClusterSpec, Deploy, HfCompiled, Process, ProcessSpec}; pub struct DeployFlow<'a, D: LocalDeploy<'a>> { pub(super) ir: Vec>, pub(super) nodes: HashMap, + pub(super) externals: HashMap, pub(super) clusters: HashMap, pub(super) used: bool, @@ -35,6 +44,17 @@ impl<'a, D: LocalDeploy<'a>> DeployFlow<'a, D> { self } + pub fn with_external

( + mut self, + process: &ExternalProcess

, + spec: impl ExternalSpec<'a, D>, + ) -> Self { + let tag_name = std::any::type_name::

().to_string(); + self.externals + .insert(process.id, spec.build(process.id, &tag_name)); + self + } + pub fn with_cluster(mut self, cluster: &Cluster, spec: impl ClusterSpec<'a, D>) -> Self { let tag_name = std::any::type_name::().to_string(); self.clusters @@ -50,7 +70,15 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> { let mut seen_tees: HashMap<_, _> = HashMap::new(); let mut ir_leaves_networked: Vec = std::mem::take(&mut self.ir) .into_iter() - .map(|leaf| leaf.compile_network::(env, &mut seen_tees, &self.nodes, &self.clusters)) + .map(|leaf| { + leaf.compile_network::( + env, + &mut seen_tees, + &self.nodes, + &self.clusters, + &self.externals, + ) + }) .collect(); let extra_stmts = self.extra_stmts(env); @@ -111,6 +139,7 @@ impl<'a, D: Deploy<'a, CompileEnv = ()>> DeployFlow<'a, D> { &mut seen_tees_instantiate, &self.nodes, &self.clusters, + &self.externals, ) }) .collect(); @@ -119,7 +148,7 @@ impl<'a, D: Deploy<'a, CompileEnv = ()>> DeployFlow<'a, D> { let mut extra_stmts = self.extra_stmts(&()); let mut meta = D::Meta::default(); - let (mut processes, mut clusters) = ( + let (mut processes, mut clusters, mut externals) = ( std::mem::take(&mut self.nodes) .into_iter() .map(|(node_id, node)| { @@ -144,6 +173,18 @@ impl<'a, D: Deploy<'a, CompileEnv = ()>> DeployFlow<'a, D> { (cluster_id, cluster) }) .collect::>(), + std::mem::take(&mut self.externals) + .into_iter() + .map(|(external_id, external)| { + external.instantiate( + env, + &mut meta, + compiled.remove(&external_id).unwrap(), + extra_stmts.remove(&external_id).unwrap_or_default(), + ); + (external_id, external) + }) + .collect::>(), ); for node in processes.values_mut() { @@ -154,6 +195,10 @@ impl<'a, D: Deploy<'a, CompileEnv = ()>> DeployFlow<'a, D> { cluster.update_meta(&meta); } + for external in externals.values_mut() { + external.update_meta(&meta); + } + let mut seen_tees_connect = HashMap::new(); for leaf in ir_leaves_networked { leaf.connect_network(&mut seen_tees_connect); @@ -162,6 +207,7 @@ impl<'a, D: Deploy<'a, CompileEnv = ()>> DeployFlow<'a, D> { DeployResult { processes, clusters, + externals, } } } @@ -169,13 +215,14 @@ impl<'a, D: Deploy<'a, CompileEnv = ()>> DeployFlow<'a, D> { pub struct DeployResult<'a, D: Deploy<'a>> { processes: HashMap, clusters: HashMap, + externals: HashMap, } impl<'a, D: Deploy<'a>> DeployResult<'a, D> { pub fn get_process

(&self, p: &Process

) -> &D::Process { let id = match p.id() { LocationId::Process(id) => id, - LocationId::Cluster(id) => id, + _ => panic!("Process ID expected"), }; self.processes.get(&id).unwrap() @@ -183,10 +230,43 @@ impl<'a, D: Deploy<'a>> DeployResult<'a, D> { pub fn get_cluster(&self, c: &Cluster) -> &D::Cluster { let id = match c.id() { - LocationId::Process(id) => id, LocationId::Cluster(id) => id, + _ => panic!("Cluster ID expected"), }; self.clusters.get(&id).unwrap() } + + pub fn get_external

(&self, p: &ExternalProcess

) -> &D::ExternalProcess { + self.externals.get(&p.id).unwrap() + } + + pub fn raw_port(&self, port: ExternalBytesPort) -> D::ExternalRawPort { + self.externals + .get(&port.process_id) + .unwrap() + .raw_port(port.port_id) + } + + pub async fn connect_sink_bytes( + &self, + port: ExternalBytesPort, + ) -> Pin>> { + self.externals + .get(&port.process_id) + .unwrap() + .as_bytes_sink(port.port_id) + .await + } + + pub async fn connect_sink_bincode( + &self, + port: ExternalBincodePort, + ) -> Pin>> { + self.externals + .get(&port.process_id) + .unwrap() + .as_bincode_sink(port.port_id) + .await + } } diff --git a/hydroflow_plus/src/builder/mod.rs b/hydroflow_plus/src/builder/mod.rs index 6cd4cb3209f8..38e8907d3c38 100644 --- a/hydroflow_plus/src/builder/mod.rs +++ b/hydroflow_plus/src/builder/mod.rs @@ -4,17 +4,22 @@ use std::marker::PhantomData; use std::rc::Rc; use std::time::Duration; +use hydroflow::bytes::Bytes; use hydroflow::futures::stream::Stream as FuturesStream; use hydroflow::{tokio, tokio_stream}; use internal::TokenStream; use proc_macro2::Span; use quote::quote; use runtime_support::FreeVariable; +use serde::de::DeserializeOwned; +use serde::Serialize; use stageleft::*; use crate::cycle::{CycleCollection, CycleCollectionWithInitial}; use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource}; -use crate::location::{Cluster, Location, LocationId, Process}; +use crate::location::{ + Cluster, ExternalBincodePort, ExternalBytesPort, ExternalProcess, Location, LocationId, Process, +}; use crate::stream::{Bounded, NoTick, Tick, Unbounded}; use crate::{HfCycle, Optional, RuntimeContext, Singleton, Stream}; @@ -75,6 +80,7 @@ pub struct FlowBuilder<'a> { cycle_ids: RefCell>, next_node_id: RefCell, + next_external_port_id: RefCell, /// Tracks whether this flow has been finalized; it is an error to /// drop without finalizing. @@ -113,6 +119,7 @@ impl<'a> FlowBuilder<'a> { clusters: RefCell::new(vec![]), cycle_ids: RefCell::new(HashMap::new()), next_node_id: RefCell::new(0), + next_external_port_id: RefCell::new(0), finalized: false, _phantom: PhantomData, } @@ -158,6 +165,19 @@ impl<'a> FlowBuilder<'a> { } } + pub fn external_process

(&self) -> ExternalProcess

{ + let mut next_node_id = self.next_node_id.borrow_mut(); + let id = *next_node_id; + *next_node_id += 1; + + self.nodes.borrow_mut().push(id); + + ExternalProcess { + id, + _phantom: PhantomData, + } + } + pub fn cluster(&self) -> Cluster { let mut next_node_id = self.next_node_id.borrow_mut(); let id = *next_node_id; @@ -216,6 +236,81 @@ impl<'a> FlowBuilder<'a> { .tick_batch() } + pub fn source_external_bytes( + &self, + from: &ExternalProcess

, + to: &L, + ) -> (ExternalBytesPort, Stream<'a, Bytes, Unbounded, NoTick, L>) { + let next_external_port_id = { + let mut next_external_port_id = self.next_external_port_id.borrow_mut(); + let id = *next_external_port_id; + *next_external_port_id += 1; + id + }; + + ( + ExternalBytesPort { + process_id: from.id, + port_id: next_external_port_id, + }, + Stream::new( + to.id(), + self.ir_leaves().clone(), + HfPlusNode::Persist(Box::new(HfPlusNode::Network { + from_location: LocationId::ExternalProcess(from.id), + from_key: Some(next_external_port_id), + to_location: to.id(), + to_key: None, + serialize_pipeline: None, + instantiate_fn: crate::ir::DebugInstantiate::Building(), + deserialize_pipeline: Some(syn::parse_quote!(map(|b| b.unwrap().freeze()))), + input: Box::new(HfPlusNode::Source { + source: HfPlusSource::ExternalNetwork(), + location_kind: LocationId::ExternalProcess(from.id), + }), + })), + ), + ) + } + + pub fn source_external_bincode( + &self, + from: &ExternalProcess

, + to: &L, + ) -> (ExternalBincodePort, Stream<'a, T, Unbounded, NoTick, L>) { + let next_external_port_id = { + let mut next_external_port_id = self.next_external_port_id.borrow_mut(); + let id = *next_external_port_id; + *next_external_port_id += 1; + id + }; + + ( + ExternalBincodePort { + process_id: from.id, + port_id: next_external_port_id, + _phantom: PhantomData, + }, + Stream::new( + to.id(), + self.ir_leaves().clone(), + HfPlusNode::Persist(Box::new(HfPlusNode::Network { + from_location: LocationId::ExternalProcess(from.id), + from_key: Some(next_external_port_id), + to_location: to.id(), + to_key: None, + serialize_pipeline: None, + instantiate_fn: crate::ir::DebugInstantiate::Building(), + deserialize_pipeline: Some(crate::stream::deserialize_bincode::(false)), + input: Box::new(HfPlusNode::Source { + source: HfPlusSource::ExternalNetwork(), + location_kind: LocationId::ExternalProcess(from.id), + }), + })), + ), + ) + } + pub fn source_stream + Unpin, L: Location>( &self, on: &L, @@ -333,6 +428,7 @@ impl<'a> FlowBuilder<'a> { let on_id = match on.id() { LocationId::Process(id) => id, LocationId::Cluster(id) => id, + LocationId::ExternalProcess(_) => panic!(), }; let mut cycle_ids = self.cycle_ids.borrow_mut(); @@ -362,6 +458,7 @@ impl<'a> FlowBuilder<'a> { let on_id = match on.id() { LocationId::Process(id) => id, LocationId::Cluster(id) => id, + LocationId::ExternalProcess(_) => panic!(), }; let mut cycle_ids = self.cycle_ids.borrow_mut(); @@ -392,6 +489,7 @@ impl<'a> FlowBuilder<'a> { let on_id = match on.id() { LocationId::Process(id) => id, LocationId::Cluster(id) => id, + LocationId::ExternalProcess(_) => panic!(), }; let mut cycle_ids = self.cycle_ids.borrow_mut(); diff --git a/hydroflow_plus/src/deploy/graphs.rs b/hydroflow_plus/src/deploy/graphs.rs index 9cfc060698f9..a794dbb8c32e 100644 --- a/hydroflow_plus/src/deploy/graphs.rs +++ b/hydroflow_plus/src/deploy/graphs.rs @@ -7,6 +7,7 @@ pub struct SingleProcessGraph {} impl<'a> LocalDeploy<'a> for SingleProcessGraph { type Process = SingleNode; type Cluster = SingleNode; + type ExternalProcess = SingleNode; type Meta = (); type GraphId = (); @@ -58,6 +59,7 @@ pub struct MultiGraph {} impl<'a> LocalDeploy<'a> for MultiGraph { type Process = MultiNode; type Cluster = MultiNode; + type ExternalProcess = MultiNode; type Meta = (); type GraphId = usize; diff --git a/hydroflow_plus/src/deploy/mod.rs b/hydroflow_plus/src/deploy/mod.rs index ed78029a605c..56d8317bbd7d 100644 --- a/hydroflow_plus/src/deploy/mod.rs +++ b/hydroflow_plus/src/deploy/mod.rs @@ -1,4 +1,11 @@ +use std::future::Future; +use std::io::Error; +use std::pin::Pin; + +use hydroflow::bytes::Bytes; +use hydroflow::futures::Sink; use hydroflow_lang::graph::HydroflowGraph; +use serde::Serialize; use stageleft::Quoted; pub mod graphs; @@ -7,6 +14,7 @@ pub use graphs::*; pub trait LocalDeploy<'a> { type Process: Node; type Cluster: Node; + type ExternalProcess: Node; type Meta: Default; type GraphId; @@ -29,8 +37,10 @@ pub trait Deploy<'a> { type Process: Node + Clone; type Cluster: Node + Clone; - type ProcessPort; - type ClusterPort; + type ExternalProcess: Node + + RegisterPort<'a, Self>; + type Port: Clone; + type ExternalRawPort; type Meta: Default; /// Type of ID used to switch between different subgraphs at runtime. @@ -48,63 +58,79 @@ pub trait Deploy<'a> { panic!("No trivial cluster") } - fn allocate_process_port(process: &Self::Process) -> Self::ProcessPort; - fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::ClusterPort; + fn allocate_process_port(process: &Self::Process) -> Self::Port; + fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::Port; + fn allocate_external_port(external: &Self::ExternalProcess) -> Self::Port; fn o2o_sink_source( compile_env: &Self::CompileEnv, p1: &Self::Process, - p1_port: &Self::ProcessPort, + p1_port: &Self::Port, p2: &Self::Process, - p2_port: &Self::ProcessPort, + p2_port: &Self::Port, ) -> (syn::Expr, syn::Expr); fn o2o_connect( p1: &Self::Process, - p1_port: &Self::ProcessPort, + p1_port: &Self::Port, p2: &Self::Process, - p2_port: &Self::ProcessPort, + p2_port: &Self::Port, ); fn o2m_sink_source( compile_env: &Self::CompileEnv, p1: &Self::Process, - p1_port: &Self::ProcessPort, + p1_port: &Self::Port, c2: &Self::Cluster, - c2_port: &Self::ClusterPort, + c2_port: &Self::Port, ) -> (syn::Expr, syn::Expr); fn o2m_connect( p1: &Self::Process, - p1_port: &Self::ProcessPort, + p1_port: &Self::Port, c2: &Self::Cluster, - c2_port: &Self::ClusterPort, + c2_port: &Self::Port, ); fn m2o_sink_source( compile_env: &Self::CompileEnv, c1: &Self::Cluster, - c1_port: &Self::ClusterPort, + c1_port: &Self::Port, p2: &Self::Process, - p2_port: &Self::ProcessPort, + p2_port: &Self::Port, ) -> (syn::Expr, syn::Expr); fn m2o_connect( c1: &Self::Cluster, - c1_port: &Self::ClusterPort, + c1_port: &Self::Port, p2: &Self::Process, - p2_port: &Self::ProcessPort, + p2_port: &Self::Port, ); fn m2m_sink_source( compile_env: &Self::CompileEnv, c1: &Self::Cluster, - c1_port: &Self::ClusterPort, + c1_port: &Self::Port, c2: &Self::Cluster, - c2_port: &Self::ClusterPort, + c2_port: &Self::Port, ) -> (syn::Expr, syn::Expr); fn m2m_connect( c1: &Self::Cluster, - c1_port: &Self::ClusterPort, + c1_port: &Self::Port, c2: &Self::Cluster, - c2_port: &Self::ClusterPort, + c2_port: &Self::Port, + ); + + fn e2o_source( + compile_env: &Self::CompileEnv, + p1: &Self::ExternalProcess, + p1_port: &Self::Port, + p2: &Self::Process, + p2_port: &Self::Port, + ) -> syn::Expr; + + fn e2o_connect( + p1: &Self::ExternalProcess, + p1_port: &Self::Port, + p2: &Self::Process, + p2_port: &Self::Port, ); fn cluster_ids( @@ -116,15 +142,17 @@ pub trait Deploy<'a> { impl< 'a, - T: Deploy<'a, Process = N, Cluster = C, Meta = M, GraphId = R>, + T: Deploy<'a, Process = N, Cluster = C, ExternalProcess = E, Meta = M, GraphId = R>, N: Node, C: Node, + E: Node, M: Default, R, > LocalDeploy<'a> for T { type Process = N; type Cluster = C; + type ExternalProcess = E; type Meta = M; type GraphId = R; @@ -149,6 +177,10 @@ pub trait ClusterSpec<'a, D: LocalDeploy<'a> + ?Sized> { fn build(self, id: usize, name_hint: &str) -> D::Cluster; } +pub trait ExternalSpec<'a, D: LocalDeploy<'a> + ?Sized> { + fn build(self, id: usize, name_hint: &str) -> D::ExternalProcess; +} + pub trait Node { type Port; type Meta; @@ -166,3 +198,16 @@ pub trait Node { extra_stmts: Vec, ); } + +pub trait RegisterPort<'a, D: Deploy<'a> + ?Sized>: Clone { + fn register(&self, key: usize, port: D::Port); + fn raw_port(&self, key: usize) -> D::ExternalRawPort; + fn as_bytes_sink( + &self, + key: usize, + ) -> impl Future>>> + 'a; + fn as_bincode_sink( + &self, + key: usize, + ) -> impl Future>>> + 'a; +} diff --git a/hydroflow_plus/src/ir.rs b/hydroflow_plus/src/ir.rs index 886d3db50820..4d2c4f3217f1 100644 --- a/hydroflow_plus/src/ir.rs +++ b/hydroflow_plus/src/ir.rs @@ -10,7 +10,7 @@ use proc_macro2::{Span, TokenStream}; use quote::ToTokens; use syn::parse_quote; -use crate::deploy::Deploy; +use crate::deploy::{Deploy, RegisterPort}; use crate::location::LocationId; #[derive(Clone)] @@ -66,6 +66,7 @@ impl std::fmt::Debug for DebugPipelineFn { #[derive(Debug)] pub enum HfPlusSource { Stream(DebugExpr), + ExternalNetwork(), Iter(DebugExpr), Interval(DebugExpr), Spin(), @@ -98,10 +99,11 @@ impl<'a> HfPlusLeaf<'a> { seen_tees: &mut SeenTees<'a>, nodes: &HashMap, clusters: &HashMap, + externals: &HashMap, ) -> HfPlusLeaf<'a> { self.transform_children( |n, s| { - n.compile_network::(compile_env, s, nodes, clusters); + n.compile_network::(compile_env, s, nodes, clusters, externals); }, seen_tees, ) @@ -187,6 +189,7 @@ impl<'a> HfPlusLeaf<'a> { let location_id = match location_kind { LocationId::Process(id) => id, LocationId::Cluster(id) => id, + LocationId::ExternalProcess(_) => panic!(), }; assert_eq!( @@ -285,7 +288,9 @@ pub enum HfPlusNode<'a> { Network { from_location: LocationId, + from_key: Option, to_location: LocationId, + to_key: Option, serialize_pipeline: Option, instantiate_fn: DebugInstantiate<'a>, deserialize_pipeline: Option, @@ -302,170 +307,33 @@ impl<'a> HfPlusNode<'a> { seen_tees: &mut SeenTees<'a>, nodes: &HashMap, clusters: &HashMap, + externals: &HashMap, ) { self.transform_children( - |n, s| n.compile_network::(compile_env, s, nodes, clusters), + |n, s| n.compile_network::(compile_env, s, nodes, clusters, externals), seen_tees, ); if let HfPlusNode::Network { from_location, + from_key, to_location, + to_key, instantiate_fn, .. } = self { let (sink_expr, source_expr, connect_fn) = match instantiate_fn { - DebugInstantiate::Building() => { - let ((sink, source), connect_fn) = match (from_location, to_location) { - (LocationId::Process(from), LocationId::Process(to)) => { - let from_node = nodes - .get(from) - .unwrap_or_else(|| { - panic!( - "A location used in the graph was not instantiated: {}", - from - ) - }) - .clone(); - let to_node = nodes - .get(to) - .unwrap_or_else(|| { - panic!( - "A location used in the graph was not instantiated: {}", - to - ) - }) - .clone(); - - let sink_port = D::allocate_process_port(&from_node); - let source_port = D::allocate_process_port(&to_node); - - ( - D::o2o_sink_source( - compile_env, - &from_node, - &sink_port, - &to_node, - &source_port, - ), - Box::new(move || { - D::o2o_connect(&from_node, &sink_port, &to_node, &source_port) - }) as Box, - ) - } - (LocationId::Process(from), LocationId::Cluster(to)) => { - let from_node = nodes - .get(from) - .unwrap_or_else(|| { - panic!( - "A location used in the graph was not instantiated: {}", - from - ) - }) - .clone(); - let to_node = clusters - .get(to) - .unwrap_or_else(|| { - panic!( - "A location used in the graph was not instantiated: {}", - to - ) - }) - .clone(); - - let sink_port = D::allocate_process_port(&from_node); - let source_port = D::allocate_cluster_port(&to_node); - - ( - D::o2m_sink_source( - compile_env, - &from_node, - &sink_port, - &to_node, - &source_port, - ), - Box::new(move || { - D::o2m_connect(&from_node, &sink_port, &to_node, &source_port) - }) as Box, - ) - } - (LocationId::Cluster(from), LocationId::Process(to)) => { - let from_node = clusters - .get(from) - .unwrap_or_else(|| { - panic!( - "A location used in the graph was not instantiated: {}", - from - ) - }) - .clone(); - let to_node = nodes - .get(to) - .unwrap_or_else(|| { - panic!( - "A location used in the graph was not instantiated: {}", - to - ) - }) - .clone(); - - let sink_port = D::allocate_cluster_port(&from_node); - let source_port = D::allocate_process_port(&to_node); - - ( - D::m2o_sink_source( - compile_env, - &from_node, - &sink_port, - &to_node, - &source_port, - ), - Box::new(move || { - D::m2o_connect(&from_node, &sink_port, &to_node, &source_port) - }) as Box, - ) - } - (LocationId::Cluster(from), LocationId::Cluster(to)) => { - let from_node = clusters - .get(from) - .unwrap_or_else(|| { - panic!( - "A location used in the graph was not instantiated: {}", - from - ) - }) - .clone(); - let to_node = clusters - .get(to) - .unwrap_or_else(|| { - panic!( - "A location used in the graph was not instantiated: {}", - to - ) - }) - .clone(); - - let sink_port = D::allocate_cluster_port(&from_node); - let source_port = D::allocate_cluster_port(&to_node); - - ( - D::m2m_sink_source( - compile_env, - &from_node, - &sink_port, - &to_node, - &source_port, - ), - Box::new(move || { - D::m2m_connect(&from_node, &sink_port, &to_node, &source_port) - }) as Box, - ) - } - }; - - (sink, source, connect_fn) - } + DebugInstantiate::Building() => instantiate_network::( + from_location, + *from_key, + to_location, + *to_key, + nodes, + clusters, + externals, + compile_env, + ), DebugInstantiate::Finalized(_, _, _) => panic!("network already finalized"), }; @@ -662,49 +530,58 @@ impl<'a> HfPlusNode<'a> { source, location_kind, } => { - let source_id = *next_stmt_id; - *next_stmt_id += 1; + let location_id = match location_kind { + LocationId::Process(id) => id, + LocationId::Cluster(id) => id, + LocationId::ExternalProcess(id) => id, + }; - let source_ident = - syn::Ident::new(&format!("stream_{}", source_id), Span::call_site()); + if let HfPlusSource::ExternalNetwork() = source { + (syn::Ident::new("DUMMY", Span::call_site()), *location_id) + } else { + let source_id = *next_stmt_id; + *next_stmt_id += 1; - let source_stmt = match source { - HfPlusSource::Stream(expr) => { - parse_quote! { - #source_ident = source_stream(#expr); + let source_ident = + syn::Ident::new(&format!("stream_{}", source_id), Span::call_site()); + + let source_stmt = match source { + HfPlusSource::Stream(expr) => { + parse_quote! { + #source_ident = source_stream(#expr); + } } - } - HfPlusSource::Iter(expr) => { - parse_quote! { - #source_ident = source_iter(#expr); + HfPlusSource::ExternalNetwork() => { + unreachable!() } - } - HfPlusSource::Interval(expr) => { - parse_quote! { - #source_ident = source_interval(#expr); + HfPlusSource::Iter(expr) => { + parse_quote! { + #source_ident = source_iter(#expr); + } } - } - HfPlusSource::Spin() => { - parse_quote! { - #source_ident = spin(); + HfPlusSource::Interval(expr) => { + parse_quote! { + #source_ident = source_interval(#expr); + } } - } - }; - let location_id = match location_kind { - LocationId::Process(id) => id, - LocationId::Cluster(id) => id, - }; + HfPlusSource::Spin() => { + parse_quote! { + #source_ident = spin(); + } + } + }; - graph_builders - .entry(*location_id) - .or_default() - .add_statement(source_stmt); + graph_builders + .entry(*location_id) + .or_default() + .add_statement(source_stmt); - (source_ident, *location_id) + (source_ident, *location_id) + } } HfPlusNode::CycleSource { @@ -714,6 +591,7 @@ impl<'a> HfPlusNode<'a> { let location_id = match location_kind { LocationId::Process(id) => id, LocationId::Cluster(id) => id, + LocationId::ExternalProcess(_) => panic!(), }; (ident.clone(), *location_id) @@ -1198,7 +1076,9 @@ impl<'a> HfPlusNode<'a> { HfPlusNode::Network { from_location: _, + from_key: _, to_location, + to_key: _, serialize_pipeline, instantiate_fn, deserialize_pipeline, @@ -1232,6 +1112,7 @@ impl<'a> HfPlusNode<'a> { let to_id = match to_location { LocationId::Process(id) => id, LocationId::Cluster(id) => id, + LocationId::ExternalProcess(_) => panic!(), }; let receiver_builder = graph_builders.entry(*to_id).or_default(); @@ -1256,3 +1137,155 @@ impl<'a> HfPlusNode<'a> { } } } + +#[expect(clippy::too_many_arguments, reason = "networking internals")] +fn instantiate_network<'a, D: Deploy<'a> + 'a>( + from_location: &mut LocationId, + from_key: Option, + to_location: &mut LocationId, + _to_key: Option, + nodes: &HashMap, + clusters: &HashMap, + externals: &HashMap, + compile_env: &D::CompileEnv, +) -> (syn::Expr, syn::Expr, Box) { + let ((sink, source), connect_fn) = match (from_location, to_location) { + (LocationId::Process(from), LocationId::Process(to)) => { + let from_node = nodes + .get(from) + .unwrap_or_else(|| { + panic!("A process used in the graph was not instantiated: {}", from) + }) + .clone(); + let to_node = nodes + .get(to) + .unwrap_or_else(|| { + panic!("A process used in the graph was not instantiated: {}", to) + }) + .clone(); + + let sink_port = D::allocate_process_port(&from_node); + let source_port = D::allocate_process_port(&to_node); + + ( + D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port), + Box::new(move || D::o2o_connect(&from_node, &sink_port, &to_node, &source_port)) + as Box, + ) + } + (LocationId::Process(from), LocationId::Cluster(to)) => { + let from_node = nodes + .get(from) + .unwrap_or_else(|| { + panic!("A process used in the graph was not instantiated: {}", from) + }) + .clone(); + let to_node = clusters + .get(to) + .unwrap_or_else(|| { + panic!("A cluster used in the graph was not instantiated: {}", to) + }) + .clone(); + + let sink_port = D::allocate_process_port(&from_node); + let source_port = D::allocate_cluster_port(&to_node); + + ( + D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port), + Box::new(move || D::o2m_connect(&from_node, &sink_port, &to_node, &source_port)) + as Box, + ) + } + (LocationId::Cluster(from), LocationId::Process(to)) => { + let from_node = clusters + .get(from) + .unwrap_or_else(|| { + panic!("A cluster used in the graph was not instantiated: {}", from) + }) + .clone(); + let to_node = nodes + .get(to) + .unwrap_or_else(|| { + panic!("A process used in the graph was not instantiated: {}", to) + }) + .clone(); + + let sink_port = D::allocate_cluster_port(&from_node); + let source_port = D::allocate_process_port(&to_node); + + ( + D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port), + Box::new(move || D::m2o_connect(&from_node, &sink_port, &to_node, &source_port)) + as Box, + ) + } + (LocationId::Cluster(from), LocationId::Cluster(to)) => { + let from_node = clusters + .get(from) + .unwrap_or_else(|| { + panic!("A cluster used in the graph was not instantiated: {}", from) + }) + .clone(); + let to_node = clusters + .get(to) + .unwrap_or_else(|| { + panic!("A cluster used in the graph was not instantiated: {}", to) + }) + .clone(); + + let sink_port = D::allocate_cluster_port(&from_node); + let source_port = D::allocate_cluster_port(&to_node); + + ( + D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port), + Box::new(move || D::m2m_connect(&from_node, &sink_port, &to_node, &source_port)) + as Box, + ) + } + (LocationId::ExternalProcess(from), LocationId::Process(to)) => { + let from_node = externals + .get(from) + .unwrap_or_else(|| { + panic!( + "A external used in the graph was not instantiated: {}", + from + ) + }) + .clone(); + + let to_node = nodes + .get(to) + .unwrap_or_else(|| { + panic!("A process used in the graph was not instantiated: {}", to) + }) + .clone(); + + let sink_port = D::allocate_external_port(&from_node); + let source_port = D::allocate_process_port(&to_node); + + from_node.register(from_key.unwrap(), sink_port.clone()); + + ( + ( + parse_quote!(DUMMY), + D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port), + ), + Box::new(move || D::e2o_connect(&from_node, &sink_port, &to_node, &source_port)) + as Box, + ) + } + (LocationId::ExternalProcess(_from), LocationId::Cluster(_to)) => { + todo!("NYI") + } + (LocationId::ExternalProcess(_), LocationId::ExternalProcess(_)) => { + panic!("Cannot send from external to external") + } + (LocationId::Process(_from), LocationId::ExternalProcess(_to)) => { + todo!("NYI") + } + (LocationId::Cluster(_from), LocationId::ExternalProcess(_to)) => { + todo!("NYI") + } + }; + (sink, source, connect_fn) +} diff --git a/hydroflow_plus/src/location.rs b/hydroflow_plus/src/location.rs index 1922c0730b60..0b361bd6d7e3 100644 --- a/hydroflow_plus/src/location.rs +++ b/hydroflow_plus/src/location.rs @@ -1,15 +1,44 @@ use std::marker::PhantomData; +use serde::de::DeserializeOwned; +use serde::Serialize; + #[derive(PartialEq, Eq, Clone, Copy, Debug)] pub enum LocationId { Process(usize), Cluster(usize), + ExternalProcess(usize), } pub trait Location { fn id(&self) -> LocationId; } +pub struct ExternalBytesPort { + pub(crate) process_id: usize, + pub(crate) port_id: usize, +} + +pub struct ExternalBincodePort { + pub(crate) process_id: usize, + pub(crate) port_id: usize, + pub(crate) _phantom: PhantomData, +} + +pub struct ExternalProcess

{ + pub(crate) id: usize, + pub(crate) _phantom: PhantomData

, +} + +impl

Clone for ExternalProcess

{ + fn clone(&self) -> Self { + ExternalProcess { + id: self.id, + _phantom: PhantomData, + } + } +} + pub struct Process

{ pub(crate) id: usize, pub(crate) _phantom: PhantomData

, diff --git a/hydroflow_plus/src/persist_pullup.rs b/hydroflow_plus/src/persist_pullup.rs index b2aa48dc88be..36c18d10544f 100644 --- a/hydroflow_plus/src/persist_pullup.rs +++ b/hydroflow_plus/src/persist_pullup.rs @@ -114,7 +114,9 @@ fn persist_pullup_node<'a>( } => { if let HfPlusNode::Network { from_location, + from_key, to_location, + to_key, serialize_pipeline, instantiate_fn, deserialize_pipeline, @@ -123,7 +125,9 @@ fn persist_pullup_node<'a>( { *node = HfPlusNode::Persist(Box::new(HfPlusNode::Network { from_location, + from_key, to_location, + to_key, serialize_pipeline, instantiate_fn, deserialize_pipeline, diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index 74317059d063..4884c52b379d 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -675,7 +675,7 @@ fn serialize_bincode(is_demux: bool) -> Pipeline { } } -fn deserialize_bincode(tagged: bool) -> Pipeline { +pub(super) fn deserialize_bincode(tagged: bool) -> Pipeline { let root = get_this_crate(); let t_type: syn::Type = stageleft::quote_type::(); @@ -714,7 +714,9 @@ impl<'a, T, W, N: Location> Stream<'a, T, W, NoTick, N> { self.ir_leaves, HfPlusNode::Network { from_location: self.location_kind, + from_key: None, to_location: other.id(), + to_key: None, serialize_pipeline, instantiate_fn: DebugInstantiate::Building(), deserialize_pipeline, @@ -735,7 +737,9 @@ impl<'a, T, W, N: Location> Stream<'a, T, W, NoTick, N> { self.ir_leaves, HfPlusNode::Network { from_location: self.location_kind, + from_key: None, to_location: other.id(), + to_key: None, serialize_pipeline: None, instantiate_fn: DebugInstantiate::Building(), deserialize_pipeline: if N::is_tagged() { diff --git a/hydroflow_plus_test/examples/first_ten_distributed.rs b/hydroflow_plus_test/examples/first_ten_distributed.rs index 0c909a1a900d..f55badf07484 100644 --- a/hydroflow_plus_test/examples/first_ten_distributed.rs +++ b/hydroflow_plus_test/examples/first_ten_distributed.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use futures::SinkExt; use hydro_deploy::gcp::GcpNetwork; use hydro_deploy::{Deployment, Host}; use hydroflow_plus_deploy::TrybuildHost; @@ -39,8 +40,9 @@ async fn main() { }; let builder = hydroflow_plus::FlowBuilder::new(); - let (p1, p2) = hydroflow_plus_test::distributed::first_ten::first_ten_distributed(&builder); - let _nodes = builder + let (external_process, external_port, p1, p2) = + hydroflow_plus_test::distributed::first_ten::first_ten_distributed(&builder); + let nodes = builder .with_default_optimize() .with_process( &p1, @@ -50,7 +52,24 @@ async fn main() { &p2, TrybuildHost::new(create_host(&mut deployment)).rustflags(rustflags), ) + .with_external(&external_process, deployment.Localhost() as Arc) .deploy(&mut deployment); - deployment.run_ctrl_c().await.unwrap(); + deployment.deploy().await.unwrap(); + + let mut external_port = nodes.connect_sink_bincode(external_port).await; + + deployment.start().await.unwrap(); + + println!("Enter characters and press enter to send them over the network (ctrl-d to stop):"); + loop { + let mut in_line = String::new(); + if std::io::stdin().read_line(&mut in_line).unwrap() == 0 { + break; + } + + external_port.send(in_line).await.unwrap(); + } + + deployment.stop().await.unwrap(); } diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap index 3957c9875348..bc6617bf546e 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap @@ -17,9 +17,11 @@ expression: built.ir() from_location: Cluster( 0, ), + from_key: None, to_location: Process( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__many_to_many__tests__many_to_many.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__many_to_many__tests__many_to_many.snap index b406d3d71bb8..1a04dda0fb11 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__many_to_many__tests__many_to_many.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__many_to_many__tests__many_to_many.snap @@ -9,9 +9,11 @@ expression: built.ir() from_location: Cluster( 0, ), + from_key: None, to_location: Cluster( 0, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir.snap index 8b3bc45fafbc..c33d9b128113 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__map_reduce__tests__map_reduce_ir.snap @@ -14,9 +14,11 @@ expression: built.ir() from_location: Cluster( 1, ), + from_key: None, to_location: Process( 0, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -49,9 +51,11 @@ expression: built.ir() from_location: Process( 0, ), + from_key: None, to_location: Cluster( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index 699752d6a65f..46452afa88e2 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -366,9 +366,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 2, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -779,9 +781,11 @@ expression: built.ir() from_location: Cluster( 0, ), + from_key: None, to_location: Cluster( 2, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -837,9 +841,11 @@ expression: built.ir() from_location: Cluster( 0, ), + from_key: None, to_location: Cluster( 2, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -938,9 +944,11 @@ expression: built.ir() from_location: Cluster( 0, ), + from_key: None, to_location: Cluster( 2, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -1506,9 +1514,11 @@ expression: built.ir() from_location: Cluster( 0, ), + from_key: None, to_location: Cluster( 2, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -1564,9 +1574,11 @@ expression: built.ir() from_location: Cluster( 0, ), + from_key: None, to_location: Cluster( 2, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -1665,9 +1677,11 @@ expression: built.ir() from_location: Cluster( 0, ), + from_key: None, to_location: Cluster( 2, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -2120,9 +2134,11 @@ expression: built.ir() from_location: Cluster( 3, ), + from_key: None, to_location: Cluster( 2, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -2158,9 +2174,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 3, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -2475,9 +2493,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 3, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -2807,9 +2827,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 3, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -3150,9 +3172,11 @@ expression: built.ir() from_location: Cluster( 0, ), + from_key: None, to_location: Cluster( 2, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -3433,9 +3457,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 3, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -3770,9 +3796,11 @@ expression: built.ir() from_location: Cluster( 1, ), + from_key: None, to_location: Cluster( 3, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -3826,9 +3854,11 @@ expression: built.ir() from_location: Cluster( 1, ), + from_key: None, to_location: Cluster( 3, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -3904,9 +3934,11 @@ expression: built.ir() from_location: Cluster( 3, ), + from_key: None, to_location: Cluster( 2, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -3939,9 +3971,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 3, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -4282,9 +4316,11 @@ expression: built.ir() from_location: Cluster( 0, ), + from_key: None, to_location: Cluster( 2, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -4565,9 +4601,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 3, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -4909,9 +4947,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -5030,9 +5070,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -5195,9 +5237,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -5316,9 +5360,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -5518,9 +5564,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -5639,9 +5687,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -5844,9 +5894,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -5965,9 +6017,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -6123,9 +6177,11 @@ expression: built.ir() from_location: Cluster( 1, ), + from_key: None, to_location: Cluster( 0, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -6157,9 +6213,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -6278,9 +6336,11 @@ expression: built.ir() from_location: Cluster( 1, ), + from_key: None, to_location: Cluster( 0, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -6312,9 +6372,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -6468,9 +6530,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 0, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -6762,9 +6826,11 @@ expression: built.ir() from_location: Cluster( 1, ), + from_key: None, to_location: Cluster( 0, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -6796,9 +6862,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -6968,9 +7036,11 @@ expression: built.ir() from_location: Cluster( 1, ), + from_key: None, to_location: Cluster( 0, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -7002,9 +7072,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -7166,9 +7238,11 @@ expression: built.ir() from_location: Cluster( 1, ), + from_key: None, to_location: Cluster( 0, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -7200,9 +7274,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -7383,9 +7459,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 0, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -7674,9 +7752,11 @@ expression: built.ir() from_location: Cluster( 1, ), + from_key: None, to_location: Cluster( 0, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -7708,9 +7788,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -7830,9 +7912,11 @@ expression: built.ir() from_location: Cluster( 2, ), + from_key: None, to_location: Cluster( 0, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__simple_cluster__tests__simple_cluster.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__simple_cluster__tests__simple_cluster.snap index 8b7dd0063c5f..1889a09ef06e 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__simple_cluster__tests__simple_cluster.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__simple_cluster__tests__simple_cluster.snap @@ -9,9 +9,11 @@ expression: built.ir() from_location: Cluster( 1, ), + from_key: None, to_location: Process( 0, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -39,9 +41,11 @@ expression: built.ir() from_location: Process( 0, ), + from_key: None, to_location: Cluster( 1, ), + to_key: None, serialize_pipeline: Some( Operator( Operator { diff --git a/hydroflow_plus_test/src/distributed/first_ten.rs b/hydroflow_plus_test/src/distributed/first_ten.rs index 515f9d1edfc2..ea10a880b729 100644 --- a/hydroflow_plus_test/src/distributed/first_ten.rs +++ b/hydroflow_plus_test/src/distributed/first_ten.rs @@ -1,4 +1,5 @@ use hydroflow_plus::*; +use location::{ExternalBincodePort, ExternalProcess}; use serde::{Deserialize, Serialize}; use stageleft::*; @@ -10,22 +11,42 @@ struct SendOverNetwork { pub struct P1 {} pub struct P2 {} -pub fn first_ten_distributed(flow: &FlowBuilder) -> (Process, Process) { +pub fn first_ten_distributed( + flow: &FlowBuilder, +) -> ( + ExternalProcess<()>, + ExternalBincodePort, + Process, + Process, +) { + let external_process = flow.external_process::<()>(); let process = flow.process::(); let second_process = flow.process::(); + let (numbers_external_port, numbers_external) = + flow.source_external_bincode(&external_process, &process); + numbers_external.for_each(q!(|n| println!("hi: {:?}", n))); + let numbers = flow.source_iter(&process, q!(0..10)); numbers .map(q!(|n| SendOverNetwork { n })) .send_bincode(&second_process) .for_each(q!(|n: SendOverNetwork| println!("{}", n.n))); // TODO(shadaj): why is the explicit type required here? - (process, second_process) + ( + external_process, + numbers_external_port, + process, + second_process, + ) } #[cfg(test)] mod tests { - use hydro_deploy::Deployment; + use std::sync::Arc; + + use futures::SinkExt; + use hydro_deploy::{Deployment, Host}; use hydroflow_plus_deploy::{DeployCrateWrapper, TrybuildHost}; #[tokio::test] @@ -33,7 +54,8 @@ mod tests { let mut deployment = Deployment::new(); let builder = hydroflow_plus::FlowBuilder::new(); - let (first_node, second_node) = super::first_ten_distributed(&builder); + let (external_process, external_port, first_node, second_node) = + super::first_ten_distributed(&builder); let built = builder.with_default_optimize(); @@ -43,14 +65,27 @@ mod tests { let nodes = built .with_process(&first_node, TrybuildHost::new(deployment.Localhost())) .with_process(&second_node, TrybuildHost::new(deployment.Localhost())) + .with_external(&external_process, deployment.Localhost() as Arc) .deploy(&mut deployment); deployment.deploy().await.unwrap(); + let mut external_port = nodes.connect_sink_bincode(external_port).await; + + let mut first_node_stdout = nodes.get_process(&first_node).stdout().await; let mut second_node_stdout = nodes.get_process(&second_node).stdout().await; deployment.start().await.unwrap(); + external_port + .send("this is some string".to_string()) + .await + .unwrap(); + assert_eq!( + first_node_stdout.recv().await.unwrap(), + "hi: \"this is some string\"" + ); + for i in 0..10 { assert_eq!(second_node_stdout.recv().await.unwrap(), i.to_string()); } diff --git a/hydroflow_plus_test/src/distributed/snapshots/hydroflow_plus_test__distributed__first_ten__tests__first_ten_distributed.snap b/hydroflow_plus_test/src/distributed/snapshots/hydroflow_plus_test__distributed__first_ten__tests__first_ten_distributed.snap index ea6ccee8c0be..fca0ec10245f 100644 --- a/hydroflow_plus_test/src/distributed/snapshots/hydroflow_plus_test__distributed__first_ten__tests__first_ten_distributed.snap +++ b/hydroflow_plus_test/src/distributed/snapshots/hydroflow_plus_test__distributed__first_ten__tests__first_ten_distributed.snap @@ -4,14 +4,49 @@ expression: built.ir() --- [ ForEach { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: distributed :: first_ten :: SendOverNetwork , () > ({ use crate :: __staged :: distributed :: first_ten :: * ; | n : SendOverNetwork | println ! ("{}" , n . n) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < std :: string :: String , () > ({ use crate :: __staged :: distributed :: first_ten :: * ; | n | println ! ("hi: {:?}" , n) }), input: Network { - from_location: Process( + from_location: ExternalProcess( + 0, + ), + from_key: Some( 0, ), to_location: Process( 1, ), + to_key: None, + serialize_pipeline: None, + instantiate_fn: , + deserialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| res | { hydroflow_plus :: runtime_support :: bincode :: deserialize :: < std :: string :: String > (& res . unwrap ()) . unwrap () }", + ], + }, + ), + ), + input: Source { + source: ExternalNetwork, + location_kind: ExternalProcess( + 0, + ), + }, + }, + }, + ForEach { + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: distributed :: first_ten :: SendOverNetwork , () > ({ use crate :: __staged :: distributed :: first_ten :: * ; | n : SendOverNetwork | println ! ("{}" , n . n) }), + input: Network { + from_location: Process( + 1, + ), + from_key: None, + to_location: Process( + 2, + ), + to_key: None, serialize_pipeline: Some( Operator( Operator { @@ -40,7 +75,7 @@ expression: built.ir() { use crate :: __staged :: distributed :: first_ten :: * ; 0 .. 10 }, ), location_kind: Process( - 0, + 1, ), }, }, diff --git a/stageleft/src/type_name.rs b/stageleft/src/type_name.rs index cd3c79ec16dd..1077e54dde30 100644 --- a/stageleft/src/type_name.rs +++ b/stageleft/src/type_name.rs @@ -111,6 +111,23 @@ impl VisitMut for RewriteAlloc { .chain(i.segments.iter().skip(3).cloned()), ), }; + } else if i.segments.iter().take(2).collect::>() + == vec![ + &syn::PathSegment::from(syn::Ident::new("bytes", Span::call_site())), + &syn::PathSegment::from(syn::Ident::new("bytes", Span::call_site())), + ] + { + *i = syn::Path { + leading_colon: i.leading_colon, + segments: syn::punctuated::Punctuated::from_iter( + vec![syn::PathSegment::from(syn::Ident::new( + "bytes", + Span::call_site(), + ))] + .into_iter() + .chain(i.segments.iter().skip(2).cloned()), + ), + }; } else if let Some((macro_name, final_name)) = &self.mapping { if i.segments.first().unwrap().ident == macro_name { *i.segments.first_mut().unwrap() =