Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hydroflow_plus)!: strongly-typed runtime cluster IDs #1477

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/docs/hydroflow_plus/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,23 @@ stream.for_each(q!(|x| println!("{}", x)))
## Sending Data
Because clusters represent a set of instances, adding networking requires us to specify _which_ instance(s) to send data to. Clusters provide different types depending on if the source or receiver is a cluster or a process.

Elements in a cluster are identified by a **cluster ID** (a `u32`). To get the IDs of all instances in a cluster, use the `ids` method on cluster, which returns a runtime expression of type `&Vec<u32>` (which can only be used inside `q!()` or as an argument to `source_iter`). All IDs always are ranging from 0 through the length of the IDs vector.
Elements in a cluster are identified by a **cluster ID** (a `ClusterId<C>` where `C` is the typetag of the cluster). To get the IDs of all instances in a cluster, use the `members` method on cluster, which returns a runtime expression of type `&Vec<ClusterId<_>>` (which can only be used inside `q!()` or as an argument to `source_iter`). All IDs always are ranging from 0 through the length of the IDs vector.

This can then be passed into `source_iter` to load the IDs into the graph.
```rust
let stream = process.source_iter(cluster.members()).cloned();
```

### One-to-Many
When sending data from a process to a cluster, the source must be a stream of tuples of the form `(u32, T)` and sends each `T` element to the instance with the matching `u32` ID.
When sending data from a process to a cluster, the source must be a stream of tuples of the form `(ClusterId<_>, T)` and sends each `T` element to the instance with the matching ID.

This is useful for partitioning data across instances. For example, we can partition a stream of elements in a round-robin fashion by using `enumerate` to add a sequence number to each element, then using `send_bincode` to send each element to the instance with the matching sequence number:
```rust
let cluster_ids = cluster.members();
let stream = process.source_iter(q!(vec![123, 456, 789]))
.enumerate()
.map(q!(|(i, x)| (
i % cluster_ids.len() as u32,
ClusterId::from_raw(i % cluster_ids.len() as u32),
x
)))
.send_bincode(&cluster);
Expand Down
53 changes: 40 additions & 13 deletions hydroflow_plus/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ use quote::quote;
use runtime_support::FreeVariable;
use stageleft::*;

use super::staging_util::get_this_crate;
use crate::ir::HfPlusLeaf;
use crate::location::{Cluster, ExternalProcess, Process};
use crate::RuntimeContext;
use crate::{ClusterId, RuntimeContext};

pub mod built;
pub mod deploy;
Expand All @@ -31,13 +32,20 @@ pub struct FlowStateInner {

pub type FlowState = Rc<RefCell<FlowStateInner>>;

#[derive(Copy, Clone)]
pub struct ClusterIds<'a> {
pub struct ClusterIds<'a, C> {
pub(crate) id: usize,
pub(crate) _phantom: PhantomData<&'a mut &'a Vec<u32>>,
pub(crate) _phantom: PhantomData<&'a mut &'a C>,
}

impl<'a> FreeVariable<&'a Vec<u32>> for ClusterIds<'a> {
impl<'a, C> Clone for ClusterIds<'a, C> {
fn clone(&self) -> Self {
*self
}
}

impl<'a, C> Copy for ClusterIds<'a, C> {}

impl<'a, C> FreeVariable<&'a Vec<ClusterId<C>>> for ClusterIds<'a, C> {
fn to_tokens(self) -> (Option<TokenStream>, Option<TokenStream>)
where
Self: Sized,
Expand All @@ -46,19 +54,33 @@ impl<'a> FreeVariable<&'a Vec<u32>> for ClusterIds<'a> {
&format!("__hydroflow_plus_cluster_ids_{}", self.id),
Span::call_site(),
);
(None, Some(quote! { #ident }))
let root = get_this_crate();
let c_type = quote_type::<C>();
(
None,
Some(
quote! { unsafe { ::std::mem::transmute::<_, &::std::vec::Vec<#root::ClusterId<#c_type>>>(#ident) } },
),
)
}
}

impl<'a> Quoted<'a, &'a Vec<u32>> for ClusterIds<'a> {}
impl<'a, C> Quoted<'a, &'a Vec<ClusterId<C>>> for ClusterIds<'a, C> {}

#[derive(Copy, Clone)]
pub(crate) struct ClusterSelfId<'a> {
pub(crate) struct ClusterSelfId<'a, C> {
pub(crate) id: usize,
pub(crate) _phantom: PhantomData<&'a mut &'a u32>,
pub(crate) _phantom: PhantomData<&'a mut &'a C>,
}

impl<'a> FreeVariable<u32> for ClusterSelfId<'a> {
impl<'a, C> Clone for ClusterSelfId<'a, C> {
fn clone(&self) -> Self {
*self
}
}

impl<'a, C> Copy for ClusterSelfId<'a, C> {}

impl<'a, C> FreeVariable<ClusterId<C>> for ClusterSelfId<'a, C> {
fn to_tokens(self) -> (Option<TokenStream>, Option<TokenStream>)
where
Self: Sized,
Expand All @@ -67,11 +89,16 @@ impl<'a> FreeVariable<u32> for ClusterSelfId<'a> {
&format!("__hydroflow_plus_cluster_self_id_{}", self.id),
Span::call_site(),
);
(None, Some(quote! { #ident }))
let root = get_this_crate();
let c_type: syn::Type = quote_type::<C>();
(
None,
Some(quote! { #root::ClusterId::<#c_type>::from_raw(#ident) }),
)
}
}

impl<'a> Quoted<'a, u32> for ClusterSelfId<'a> {}
impl<'a, C> Quoted<'a, ClusterId<C>> for ClusterSelfId<'a, C> {}

pub struct FlowBuilder<'a> {
flow_state: FlowState,
Expand Down
3 changes: 1 addition & 2 deletions hydroflow_plus/src/deploy/deploy_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use hydro_deploy::hydroflow_crate::tracing_options::TracingOptions;
use hydro_deploy::hydroflow_crate::HydroflowCrateService;
use hydro_deploy::{CustomService, Deployment, Host, HydroflowCrate};
use hydroflow::futures::StreamExt;
use hydroflow::util::deploy::ConnectedSource;
use hydroflow::util::deploy::{ConnectedSink, ConnectedSource};
use nameof::name_of;
use serde::de::DeserializeOwned;
use serde::Serialize;
Expand All @@ -29,7 +29,6 @@ use super::trybuild::{compile_graph_trybuild, create_trybuild};
use super::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
use crate::futures::SinkExt;
use crate::lang::graph::HydroflowGraph;
use crate::util::deploy::ConnectedSink;

pub struct HydroDeploy {}

Expand Down
7 changes: 3 additions & 4 deletions hydroflow_plus/src/deploy/deploy_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::collections::HashMap;

use serde::{Deserialize, Serialize};
use stageleft::{q, Quoted, RuntimeData};

use crate::util::deploy::{
use hydroflow::util::deploy::{
ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, DeployPorts,
};
use serde::{Deserialize, Serialize};
use stageleft::{q, Quoted, RuntimeData};

#[derive(Default, Serialize, Deserialize)]
pub struct HydroflowPlusMeta {
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus/src/deploy/macro_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use std::cell::RefCell;
use std::pin::Pin;
use std::rc::Rc;

use hydroflow::util::deploy::DeployPorts;
use stageleft::{Quoted, RuntimeData};

use super::HydroflowPlusMeta;
use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
use crate::lang::graph::HydroflowGraph;
use crate::util::deploy::DeployPorts;

pub struct DeployRuntime {}

Expand Down
4 changes: 3 additions & 1 deletion hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod singleton;
pub use singleton::{Optional, Singleton};

pub mod location;
pub use location::{Cluster, Location, Process};
pub use location::{Cluster, ClusterId, Location, Process};

pub mod deploy;
pub use deploy::{ClusterSpec, Deploy, ProcessSpec};
Expand All @@ -43,6 +43,8 @@ pub mod profiler;

pub mod properties;

mod staging_util;

#[derive(Clone)]
pub struct RuntimeContext<'a> {
_phantom: PhantomData<&'a mut &'a ()>,
Expand Down
Loading
Loading