Skip to content

Commit

Permalink
feat(hydroflow_plus): add unbounded top-level singletons (#1427)
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Aug 27, 2024
1 parent f5f1eb0 commit 82de6f5
Show file tree
Hide file tree
Showing 9 changed files with 513 additions and 260 deletions.
11 changes: 7 additions & 4 deletions hydroflow_plus/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::cycle::CycleCollection;
use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource};
use crate::location::{Cluster, Location, LocationId, Process};
use crate::stream::{Bounded, NoTick, Tick, Unbounded};
use crate::{HfCycle, RuntimeContext, Stream};
use crate::{HfCycle, Optional, RuntimeContext, Stream};

pub mod built;
pub mod deploy;
Expand Down Expand Up @@ -251,10 +251,10 @@ impl<'a> FlowBuilder<'a> {
&self,
on: &L,
interval: impl Quoted<'a, Duration> + Copy + 'a,
) -> Stream<'a, (), Unbounded, NoTick, L> {
) -> Optional<'a, (), Unbounded, NoTick, L> {
let interval = interval.splice();

Stream::new(
Optional::new(
on.id(),
self.ir_leaves().clone(),
HfPlusNode::Persist(Box::new(HfPlusNode::Source {
Expand All @@ -269,13 +269,16 @@ impl<'a> FlowBuilder<'a> {
on: &L,
delay: impl Quoted<'a, Duration> + Copy + 'a,
interval: impl Quoted<'a, Duration> + Copy + 'a,
) -> Stream<'a, tokio::time::Instant, Unbounded, NoTick, L> {
) -> Optional<'a, tokio::time::Instant, Unbounded, NoTick, L> {
self.source_stream(
on,
q!(tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
)),
)
.tick_batch()
.first()
.latest()
}

pub fn cycle<S: CycleCollection<'a>>(&self, on: &S::Location) -> (HfCycle<'a, S>, S) {
Expand Down
162 changes: 142 additions & 20 deletions hydroflow_plus/src/singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use stageleft::{q, IntoQuotedMut, Quoted};

use crate::builder::FlowLeaves;
use crate::cycle::CycleCollection;
use crate::ir::{HfPlusLeaf, HfPlusNode};
use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource};
use crate::location::{Location, LocationId};
use crate::stream::{Bounded, NoTick, Tick, Unbounded};
use crate::Stream;
Expand Down Expand Up @@ -90,11 +90,11 @@ impl<'a, T: Clone, W, C, N: Location> Clone for Singleton<'a, T, W, C, N> {
}
}

impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
impl<'a, T, W, C, N: Location> Singleton<'a, T, W, C, N> {
pub fn map<U, F: Fn(T) -> U + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
) -> Singleton<'a, U, Bounded, Tick, N> {
) -> Singleton<'a, U, W, C, N> {
Singleton::new(
self.location_kind,
self.ir_leaves,
Expand All @@ -108,7 +108,7 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
pub fn flat_map<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
) -> Stream<'a, U, Bounded, Tick, N> {
) -> Stream<'a, U, W, C, N> {
Stream::new(
self.location_kind,
self.ir_leaves,
Expand All @@ -122,7 +122,7 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
pub fn filter<F: Fn(&T) -> bool + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
) -> Optional<'a, T, Bounded, Tick, N> {
) -> Optional<'a, T, W, C, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
Expand All @@ -136,7 +136,7 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
pub fn filter_map<U, F: Fn(T) -> Option<U> + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
) -> Optional<'a, U, Bounded, Tick, N> {
) -> Optional<'a, U, W, C, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
Expand All @@ -146,7 +146,9 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
},
)
}
}

impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
pub fn cross_singleton<O>(
self,
other: impl Into<Optional<'a, O, Bounded, Tick, N>>,
Expand Down Expand Up @@ -194,6 +196,14 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
)
}

pub fn latest(self) -> Optional<'a, T, Unbounded, NoTick, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
HfPlusNode::Persist(Box::new(self.ir_node.into_inner())),
)
}

pub fn defer_tick(self) -> Singleton<'a, T, Bounded, Tick, N> {
Singleton::new(
self.location_kind,
Expand All @@ -219,6 +229,33 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
}
}

impl<'a, T, N: Location> Singleton<'a, T, Unbounded, NoTick, N> {
pub fn latest_tick(self) -> Singleton<'a, T, Bounded, Tick, N> {
Singleton::new(
self.location_kind,
self.ir_leaves,
HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())),
)
}

pub fn cross_singleton<O>(
self,
other: impl Into<Optional<'a, O, Unbounded, NoTick, N>>,
) -> Optional<'a, (T, O), Unbounded, NoTick, N>
where
O: Clone,
{
let other: Optional<'a, O, Unbounded, NoTick, N> = other.into();
if self.location_kind != other.location_kind {
panic!("cross_singleton must be called on streams on the same node");
}

self.latest_tick()
.cross_singleton(other.latest_tick())
.latest()
}
}

pub struct Optional<'a, T, W, C, N: Location> {
pub(crate) location_kind: LocationId,

Expand Down Expand Up @@ -274,6 +311,29 @@ impl<'a, T, W, N: Location> CycleCollection<'a> for Optional<'a, T, W, Tick, N>
}
}

impl<'a, T, W, N: Location> CycleCollection<'a> for Optional<'a, T, W, NoTick, N> {
type Location = N;

fn create_source(ident: syn::Ident, ir_leaves: FlowLeaves<'a>, l: LocationId) -> Self {
Optional::new(
l,
ir_leaves,
HfPlusNode::Persist(Box::new(HfPlusNode::CycleSource {
ident,
location_kind: l,
})),
)
}

fn complete(self, ident: syn::Ident) {
self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
ident,
location_kind: self.location_kind,
input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))),
});
}
}

impl<'a, T, W, C, N: Location> From<Singleton<'a, T, W, C, N>> for Optional<'a, T, W, C, N> {
fn from(singleton: Singleton<'a, T, W, C, N>) -> Self {
Optional::some(singleton)
Expand Down Expand Up @@ -305,20 +365,11 @@ impl<'a, T: Clone, W, C, N: Location> Clone for Optional<'a, T, W, C, N> {
}
}

impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
// TODO(shadaj): this is technically incorrect; we should only return the first element of the stream
pub fn into_stream(self) -> Stream<'a, T, Bounded, Tick, N> {
Stream::new(
self.location_kind,
self.ir_leaves,
self.ir_node.into_inner(),
)
}

impl<'a, T, W, C, N: Location> Optional<'a, T, W, C, N> {
pub fn map<U, F: Fn(T) -> U + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
) -> Optional<'a, U, Bounded, Tick, N> {
) -> Optional<'a, U, W, C, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
Expand All @@ -332,7 +383,7 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
pub fn flat_map<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
) -> Stream<'a, U, Bounded, Tick, N> {
) -> Stream<'a, U, W, C, N> {
Stream::new(
self.location_kind,
self.ir_leaves,
Expand All @@ -346,7 +397,7 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
pub fn filter<F: Fn(&T) -> bool + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
) -> Optional<'a, T, Bounded, Tick, N> {
) -> Optional<'a, T, W, C, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
Expand All @@ -360,7 +411,7 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
pub fn filter_map<U, F: Fn(T) -> Option<U> + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
) -> Optional<'a, U, Bounded, Tick, N> {
) -> Optional<'a, U, W, C, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
Expand All @@ -370,6 +421,17 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
},
)
}
}

impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
// TODO(shadaj): this is technically incorrect; we should only return the first element of the stream
pub fn into_stream(self) -> Stream<'a, T, Bounded, Tick, N> {
Stream::new(
self.location_kind,
self.ir_leaves,
self.ir_node.into_inner(),
)
}

pub fn cross_singleton<O>(
self,
Expand Down Expand Up @@ -436,6 +498,14 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
)
}

pub fn latest(self) -> Optional<'a, T, Unbounded, NoTick, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
HfPlusNode::Persist(Box::new(self.ir_node.into_inner())),
)
}

pub fn defer_tick(self) -> Optional<'a, T, Bounded, Tick, N> {
Optional::new(
self.location_kind,
Expand All @@ -460,3 +530,55 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
)
}
}

impl<'a, T, N: Location> Optional<'a, T, Unbounded, NoTick, N> {
pub fn latest_tick(self) -> Optional<'a, T, Bounded, Tick, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())),
)
}

pub fn tick_samples(self) -> Stream<'a, T, Unbounded, NoTick, N> {
self.latest_tick().all_ticks()
}

pub fn cross_singleton<O>(
self,
other: impl Into<Optional<'a, O, Unbounded, NoTick, N>>,
) -> Optional<'a, (T, O), Unbounded, NoTick, N>
where
O: Clone,
{
let other: Optional<'a, O, Unbounded, NoTick, N> = other.into();
if self.location_kind != other.location_kind {
panic!("cross_singleton must be called on streams on the same node");
}

self.latest_tick()
.cross_singleton(other.latest_tick())
.latest()
}

pub fn sample_every(
self,
duration: impl Quoted<'a, std::time::Duration> + Copy + 'a,
) -> Stream<'a, T, Unbounded, NoTick, N> {
let interval = duration.splice();

let samples = Stream::<'a, hydroflow::tokio::time::Instant, Bounded, Tick, N>::new(
self.location_kind,
self.ir_leaves.clone(),
HfPlusNode::Source {
source: HfPlusSource::Interval(interval.into()),
location_kind: self.location_kind,
},
);

self.latest_tick()
.continue_if(samples.first())
.latest()
.tick_samples()
}
}
45 changes: 44 additions & 1 deletion hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,10 @@ impl<'a, T, N: Location> Stream<'a, T, Bounded, Tick, N> {
)
}

pub fn persist(self) -> Stream<'a, T, Bounded, Tick, N> {
pub fn persist(self) -> Stream<'a, T, Bounded, Tick, N>
where
T: Clone,
{
Stream::new(
self.location_kind,
self.ir_leaves,
Expand Down Expand Up @@ -407,6 +410,13 @@ impl<'a, T, W, N: Location> Stream<'a, T, W, NoTick, N> {
)
}

pub fn tick_prefix(self) -> Stream<'a, T, Bounded, Tick, N>
where
T: Clone,
{
self.tick_batch().persist()
}

pub fn inspect<F: Fn(&T) + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
Expand Down Expand Up @@ -449,6 +459,39 @@ impl<'a, T, N: Location> Stream<'a, T, Unbounded, NoTick, N> {

self.tick_batch().continue_if(samples.first()).all_ticks()
}

pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
self,
init: impl IntoQuotedMut<'a, I>,
comb: impl IntoQuotedMut<'a, F>,
) -> Singleton<'a, A, Unbounded, NoTick, N> {
// unbounded singletons are represented as a stream
// which produces all values from all ticks every tick,
// so delta will always give the lastest aggregation
Singleton::new(
self.location_kind,
self.ir_leaves,
HfPlusNode::Persist(Box::new(HfPlusNode::Fold {
init: init.splice().into(),
acc: comb.splice().into(),
input: Box::new(self.ir_node.into_inner()),
})),
)
}

pub fn reduce<F: Fn(&mut T, T) + 'a>(
self,
comb: impl IntoQuotedMut<'a, F>,
) -> Optional<'a, T, Unbounded, NoTick, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
HfPlusNode::Persist(Box::new(HfPlusNode::Reduce {
f: comb.splice().into(),
input: Box::new(self.ir_node.into_inner()),
})),
)
}
}

impl<'a, T, C, N: Location> Stream<'a, T, Bounded, C, N> {
Expand Down
3 changes: 0 additions & 3 deletions hydroflow_plus_test/src/cluster/compute_pi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@ pub fn compute_pi(flow: &FlowBuilder, batch_size: usize) -> (Cluster<Worker>, Pr

trials
.send_bincode_interleaved(&process)
.tick_batch()
.persist()
.reduce(q!(|(inside, total), (inside_batch, total_batch)| {
*inside += inside_batch;
*total += total_batch;
}))
.all_ticks()
.sample_every(q!(Duration::from_secs(1)))
.for_each(q!(|(inside, total)| {
println!(
Expand Down
Loading

0 comments on commit 82de6f5

Please sign in to comment.