Skip to content

Commit

Permalink
feat(hydroflow_plus)!: refactor API to have no-tick semantics by defa…
Browse files Browse the repository at this point in the history
…ult (#1421)

Now, by default streams exist at a "top-level" where there are no ticks
and operators run over the entire collection. To perform iterative
computations, developers must explicitly entire a tick domain (using
`tick_batch`), and return to the outer domain (using `all_ticks`).
  • Loading branch information
shadaj authored Aug 26, 2024
1 parent 6568263 commit 536e644
Show file tree
Hide file tree
Showing 29 changed files with 6,190 additions and 412 deletions.
83 changes: 62 additions & 21 deletions hydroflow_plus/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::rc::Rc;
use std::time::Duration;

use hydroflow::futures::stream::Stream as FuturesStream;
use hydroflow::{tokio, tokio_stream};
use internal::TokenStream;
use proc_macro2::Span;
use quote::quote;
Expand All @@ -13,7 +14,7 @@ use stageleft::*;

use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource};
use crate::location::{Cluster, Location, LocationId, Process};
use crate::stream::{Async, Windowed};
use crate::stream::{Bounded, NoTick, Tick, Unbounded};
use crate::{HfCycle, RuntimeContext, Stream};

pub mod built;
Expand Down Expand Up @@ -189,22 +190,22 @@ impl<'a> FlowBuilder<'a> {
}
}

pub fn spin<L: Location>(&self, on: &L) -> Stream<'a, (), Async, L> {
pub fn spin<L: Location>(&self, on: &L) -> Stream<'a, (), Unbounded, NoTick, L> {
Stream::new(
on.id(),
self.ir_leaves().clone(),
HfPlusNode::Source {
HfPlusNode::Persist(Box::new(HfPlusNode::Source {
source: HfPlusSource::Spin(),
location_kind: on.id(),
},
})),
)
}

pub fn spin_batch<L: Location>(
&self,
on: &L,
batch_size: impl Quoted<'a, usize> + Copy + 'a,
) -> Stream<'a, (), Windowed, L> {
) -> Stream<'a, (), Bounded, Tick, L> {
self.spin(on)
.flat_map(q!(move |_| 0..batch_size))
.map(q!(|_| ()))
Expand All @@ -215,50 +216,50 @@ impl<'a> FlowBuilder<'a> {
&self,
on: &L,
e: impl Quoted<'a, E>,
) -> Stream<'a, T, Async, L> {
) -> Stream<'a, T, Unbounded, NoTick, L> {
let e = e.splice();

Stream::new(
on.id(),
self.ir_leaves().clone(),
HfPlusNode::Source {
HfPlusNode::Persist(Box::new(HfPlusNode::Source {
source: HfPlusSource::Stream(e.into()),
location_kind: on.id(),
},
})),
)
}

pub fn source_iter<T, E: IntoIterator<Item = T>, L: Location>(
&self,
on: &L,
e: impl Quoted<'a, E>,
) -> Stream<'a, T, Windowed, L> {
) -> Stream<'a, T, Bounded, NoTick, L> {
let e = e.splice();

Stream::new(
on.id(),
self.ir_leaves().clone(),
HfPlusNode::Source {
HfPlusNode::Persist(Box::new(HfPlusNode::Source {
source: HfPlusSource::Iter(e.into()),
location_kind: on.id(),
},
})),
)
}

pub fn source_interval<L: Location>(
&self,
on: &L,
interval: impl Quoted<'a, Duration> + Copy + 'a,
) -> Stream<'a, (), Async, L> {
) -> Stream<'a, (), Unbounded, NoTick, L> {
let interval = interval.splice();

Stream::new(
on.id(),
self.ir_leaves().clone(),
HfPlusNode::Source {
HfPlusNode::Persist(Box::new(HfPlusNode::Source {
source: HfPlusSource::Interval(interval.into()),
location_kind: on.id(),
},
})),
)
}

Expand All @@ -267,19 +268,59 @@ impl<'a> FlowBuilder<'a> {
on: &L,
delay: impl Quoted<'a, Duration> + Copy + 'a,
interval: impl Quoted<'a, Duration> + Copy + 'a,
) -> Stream<'a, hydroflow::tokio::time::Instant, Async, L> {
) -> Stream<'a, tokio::time::Instant, Unbounded, NoTick, L> {
self.source_stream(
on,
q!(hydroflow::tokio_stream::wrappers::IntervalStream::new(
hydroflow::tokio::time::interval_at(
hydroflow::tokio::time::Instant::now() + delay,
interval
)
q!(tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
)),
)
}

pub fn cycle<T, W, L: Location>(&self, on: &L) -> (HfCycle<'a, T, W, L>, Stream<'a, T, W, L>) {
#[allow(clippy::type_complexity)]
pub fn cycle<T, W, L: Location>(
&self,
on: &L,
) -> (HfCycle<'a, T, W, NoTick, L>, Stream<'a, T, W, NoTick, L>) {
let next_id = {
let on_id = match on.id() {
LocationId::Process(id) => id,
LocationId::Cluster(id) => id,
};

let mut cycle_ids = self.cycle_ids.borrow_mut();
let next_id_entry = cycle_ids.entry(on_id).or_default();

let id = *next_id_entry;
*next_id_entry += 1;
id
};

let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());

(
HfCycle {
ident: ident.clone(),
location_kind: on.id(),
ir_leaves: self.ir_leaves().clone(),
_phantom: PhantomData,
},
Stream::new(
on.id(),
self.ir_leaves().clone(),
HfPlusNode::Persist(Box::new(HfPlusNode::CycleSource {
ident,
location_kind: on.id(),
})),
),
)
}

#[allow(clippy::type_complexity)]
pub fn tick_cycle<T, W, L: Location>(
&self,
on: &L,
) -> (HfCycle<'a, T, W, Tick, L>, Stream<'a, T, W, Tick, L>) {
let next_id = {
let on_id = match on.id() {
LocationId::Process(id) => id,
Expand Down
25 changes: 20 additions & 5 deletions hydroflow_plus/src/cycle.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
use std::marker::PhantomData;

use crate::builder::FlowLeaves;
use crate::ir::HfPlusLeaf;
use crate::ir::{HfPlusLeaf, HfPlusNode};
use crate::location::{Location, LocationId};
use crate::stream::{NoTick, Tick};
use crate::Stream;

/// Represents a fixpoint cycle in the graph that will be fulfilled
/// by a stream that is not yet known.
///
/// See [`Stream`] for an explainer on the type parameters.
pub struct HfCycle<'a, T, W, N: Location> {
pub struct HfCycle<'a, T, W, C, N: Location> {
pub(crate) ident: syn::Ident,
pub(crate) location_kind: LocationId,
pub(crate) ir_leaves: FlowLeaves<'a>,
pub(crate) _phantom: PhantomData<(N, &'a mut &'a (), T, W)>,
pub(crate) _phantom: PhantomData<(N, &'a mut &'a (), T, W, C)>,
}

impl<'a, T, W, N: Location> HfCycle<'a, T, W, N> {
pub fn complete(self, stream: Stream<'a, T, W, N>) {
impl<'a, T, W, N: Location> HfCycle<'a, T, W, Tick, N> {
pub fn complete(self, stream: Stream<'a, T, W, Tick, N>) {
let ident = self.ident;

self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a cycle to a flow that has already been finalized. No cycles can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
Expand All @@ -27,3 +28,17 @@ impl<'a, T, W, N: Location> HfCycle<'a, T, W, N> {
});
}
}

impl<'a, T, W, N: Location> HfCycle<'a, T, W, NoTick, N> {
pub fn complete(self, stream: Stream<'a, T, W, NoTick, N>) {
let ident = self.ident;

self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a cycle to a flow that has already been finalized. No cycles can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
ident,
location_kind: self.location_kind,
input: Box::new(HfPlusNode::Unpersist(
Box::new(stream.ir_node.into_inner())
)),
});
}
}
6 changes: 6 additions & 0 deletions hydroflow_plus/src/ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ pub enum HfPlusNode<'a> {
},

Persist(Box<HfPlusNode<'a>>),
Unpersist(Box<HfPlusNode<'a>>),
Delta(Box<HfPlusNode<'a>>),

Union(Box<HfPlusNode<'a>>, Box<HfPlusNode<'a>>),
Expand Down Expand Up @@ -519,6 +520,7 @@ impl<'a> HfPlusNode<'a> {
}

HfPlusNode::Persist(inner) => transform(inner.as_mut(), seen_tees),
HfPlusNode::Unpersist(inner) => transform(inner.as_mut(), seen_tees),
HfPlusNode::Delta(inner) => transform(inner.as_mut(), seen_tees),

HfPlusNode::Union(left, right) => {
Expand Down Expand Up @@ -623,6 +625,10 @@ impl<'a> HfPlusNode<'a> {
(persist_ident, location)
}

HfPlusNode::Unpersist(_) => {
panic!("Unpersist is a marker node and should have been optimized away. This is likely a compiler bug.")
}

HfPlusNode::Delta(inner) => {
let (inner_ident, location) = inner.emit(graph_builders, built_tees, next_stmt_id);

Expand Down
Loading

0 comments on commit 536e644

Please sign in to comment.