Skip to content

Commit

Permalink
feat: add defer() which is the same as defer_tick() except that it is…
Browse files Browse the repository at this point in the history
… lazy (#945)
  • Loading branch information
zzlk authored Oct 17, 2023
1 parent ff4bddd commit 7df0a0d
Show file tree
Hide file tree
Showing 35 changed files with 399 additions and 33 deletions.
16 changes: 13 additions & 3 deletions hydroflow/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,15 @@ impl Hydroflow {
sg_data.last_tick_run_in = Some(current_tick);
}

for &handoff_id in self.subgraphs[sg_id.0].succs.iter() {
let sg_data = &self.subgraphs[sg_id.0];

for &handoff_id in sg_data.succs.iter() {
let handoff = &self.handoffs[handoff_id.0];
if !handoff.handoff.is_bottom() {
for &succ_id in handoff.succs.iter() {
let succ_sg_data = &self.subgraphs[succ_id.0];
// If we have sent data to the next tick, then we can start the next tick.
if succ_sg_data.stratum < self.context.current_stratum {
if succ_sg_data.stratum < self.context.current_stratum && !sg_data.is_lazy {
self.can_start_tick = true;
}
// Add subgraph to stratum queue if it is not already scheduled.
Expand Down Expand Up @@ -489,7 +491,7 @@ impl Hydroflow {
W: 'static + PortList<SEND>,
F: 'static + for<'ctx> FnMut(&'ctx mut Context, R::Ctx<'ctx>, W::Ctx<'ctx>),
{
self.add_subgraph_stratified(name, 0, recv_ports, send_ports, subgraph)
self.add_subgraph_stratified(name, 0, recv_ports, send_ports, false, subgraph)
}

/// Adds a new compiled subgraph with the specified inputs, outputs, and stratum number.
Expand All @@ -501,6 +503,7 @@ impl Hydroflow {
stratum: usize,
recv_ports: R,
send_ports: W,
laziness: bool,
mut subgraph: F,
) -> SubgraphId
where
Expand All @@ -527,6 +530,7 @@ impl Hydroflow {
subgraph_preds,
subgraph_succs,
true,
laziness,
));
self.init_stratum(stratum);
self.stratum_queues[stratum].push_back(sg_id);
Expand Down Expand Up @@ -618,6 +622,7 @@ impl Hydroflow {
subgraph_preds,
subgraph_succs,
true,
false,
));
self.init_stratum(stratum);
self.stratum_queues[stratum].push_back(sg_id);
Expand Down Expand Up @@ -760,6 +765,9 @@ pub(super) struct SubgraphData {

/// Keep track of the last tick that this subgraph was run in
last_tick_run_in: Option<usize>,

/// If this subgraph is marked as lazy, then sending data back to a lower stratum does not trigger a new tick to be run.
is_lazy: bool,
}
impl SubgraphData {
pub fn new(
Expand All @@ -769,6 +777,7 @@ impl SubgraphData {
preds: Vec<HandoffId>,
succs: Vec<HandoffId>,
is_scheduled: bool,
laziness: bool,
) -> Self {
Self {
name,
Expand All @@ -778,6 +787,7 @@ impl SubgraphData {
succs,
is_scheduled: Cell::new(is_scheduled),
last_tick_run_in: None,
is_lazy: laziness,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
---
source: hydroflow/tests/surface_stratum.rs
expression: "df.meta_graph().unwrap().to_dot(&Default::default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) union()", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) tee()", shape=house, fillcolor="#ffff88"]
n3v1 [label="(n3v1) source_iter([1, 3])", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) defer_tick_lazy()", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) map(|x| 2 * x)", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) for_each(|x| output_inner.borrow_mut().push(x))", shape=house, fillcolor="#ffff88"]
n7v1 [label="(n7v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n8v1 [label="(n8v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n9v1 [label="(n9v1) identity()", shape=invhouse, fillcolor="#88aaff"]
n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n1v1 -> n2v1
n3v1 -> n1v1 [label="0"]
n5v1 -> n1v1 [label="1"]
n4v1 -> n7v1
n2v1 -> n8v1 [label="0"]
n2v1 -> n6v1 [label="1"]
n7v1 -> n5v1
n8v1 -> n9v1
n9v1 -> n10v1
n10v1 -> n4v1 [color=red]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n4v1
}
subgraph "cluster n2v1" {
fillcolor="#dddddd"
style=filled
label = "sg_2v1\nstratum 0"
n3v1
n5v1
n1v1
n2v1
n6v1
subgraph "cluster_sg_2v1_var_a" {
label="var a"
n1v1
n2v1
}
}
subgraph "cluster n3v1" {
fillcolor="#dddddd"
style=filled
label = "sg_3v1\nstratum 1"
n9v1
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
---
source: hydroflow/tests/surface_stratum.rs
expression: "df.meta_graph().unwrap().to_mermaid(&Default::default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>union()</code>"/]:::pullClass
2v1[/"(2v1) <code>tee()</code>"\]:::pushClass
3v1[\"(3v1) <code>source_iter([1, 3])</code>"/]:::pullClass
4v1[\"(4v1) <code>defer_tick_lazy()</code>"/]:::pullClass
5v1[\"(5v1) <code>map(|x| 2 * x)</code>"/]:::pullClass
6v1[/"(6v1) <code>for_each(|x| output_inner.borrow_mut().push(x))</code>"\]:::pushClass
7v1["(7v1) <code>handoff</code>"]:::otherClass
8v1["(8v1) <code>handoff</code>"]:::otherClass
9v1[\"(9v1) <code>identity()</code>"/]:::pullClass
10v1["(10v1) <code>handoff</code>"]:::otherClass
1v1-->2v1
3v1-->|0|1v1
5v1-->|1|1v1
4v1-->7v1
2v1-->|0|8v1
2v1-->|1|6v1
7v1-->5v1
8v1-->9v1
9v1-->10v1
10v1--o4v1; linkStyle 9 stroke:red
subgraph sg_1v1 ["sg_1v1 stratum 0"]
4v1
end
subgraph sg_2v1 ["sg_2v1 stratum 0"]
3v1
5v1
1v1
2v1
6v1
subgraph sg_2v1_var_a ["var <tt>a</tt>"]
1v1
2v1
end
end
subgraph sg_3v1 ["sg_3v1 stratum 1"]
9v1
end

Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
---
source: hydroflow/tests/surface_stratum.rs
expression: "df.meta_graph().unwrap().to_dot(&Default::default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) union()", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) tee()", shape=house, fillcolor="#ffff88"]
n3v1 [label="(n3v1) source_iter([1, 3])", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) defer()", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) map(|x| 2 * x)", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) for_each(|x| output_inner.borrow_mut().push(x))", shape=house, fillcolor="#ffff88"]
n7v1 [label="(n7v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n8v1 [label="(n8v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n9v1 [label="(n9v1) identity()", shape=invhouse, fillcolor="#88aaff"]
n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n1v1 -> n2v1
n3v1 -> n1v1 [label="0"]
n5v1 -> n1v1 [label="1"]
n4v1 -> n7v1
n2v1 -> n8v1 [label="0"]
n2v1 -> n6v1 [label="1"]
n7v1 -> n5v1
n8v1 -> n9v1
n9v1 -> n10v1
n10v1 -> n4v1 [color=red]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n4v1
}
subgraph "cluster n2v1" {
fillcolor="#dddddd"
style=filled
label = "sg_2v1\nstratum 0"
n3v1
n5v1
n1v1
n2v1
n6v1
subgraph "cluster_sg_2v1_var_a" {
label="var a"
n1v1
n2v1
}
}
subgraph "cluster n3v1" {
fillcolor="#dddddd"
style=filled
label = "sg_3v1\nstratum 1"
n9v1
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
---
source: hydroflow/tests/surface_stratum.rs
expression: "df.meta_graph().unwrap().to_mermaid(&Default::default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>union()</code>"/]:::pullClass
2v1[/"(2v1) <code>tee()</code>"\]:::pushClass
3v1[\"(3v1) <code>source_iter([1, 3])</code>"/]:::pullClass
4v1[\"(4v1) <code>defer()</code>"/]:::pullClass
5v1[\"(5v1) <code>map(|x| 2 * x)</code>"/]:::pullClass
6v1[/"(6v1) <code>for_each(|x| output_inner.borrow_mut().push(x))</code>"\]:::pushClass
7v1["(7v1) <code>handoff</code>"]:::otherClass
8v1["(8v1) <code>handoff</code>"]:::otherClass
9v1[\"(9v1) <code>identity()</code>"/]:::pullClass
10v1["(10v1) <code>handoff</code>"]:::otherClass
1v1-->2v1
3v1-->|0|1v1
5v1-->|1|1v1
4v1-->7v1
2v1-->|0|8v1
2v1-->|1|6v1
7v1-->5v1
8v1-->9v1
9v1-->10v1
10v1--o4v1; linkStyle 9 stroke:red
subgraph sg_1v1 ["sg_1v1 stratum 0"]
4v1
end
subgraph sg_2v1 ["sg_2v1 stratum 0"]
3v1
5v1
1v1
2v1
6v1
subgraph sg_2v1_var_a ["var <tt>a</tt>"]
1v1
2v1
end
end
subgraph sg_3v1 ["sg_3v1 stratum 1"]
9v1
end

32 changes: 32 additions & 0 deletions hydroflow/tests/surface_stratum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,35 @@ pub fn test_subgraph_stratum_consolidation() {
df.run_available();
assert!(output.take().is_empty());
}

#[multiplatform_test]
pub fn test_defer_lazy() {
let output = <Rc<RefCell<Vec<usize>>>>::default();
let output_inner = Rc::clone(&output);

// Without `defer()` this would spin forever with run_available().
let mut df: Hydroflow = hydroflow_syntax! {
a = union() -> tee();
source_iter([1, 3]) -> [0]a;
a[0] -> defer_tick_lazy() -> map(|x| 2 * x) -> [1]a;
a[1] -> for_each(|x| output_inner.borrow_mut().push(x));
};
println!(
"{}",
df.meta_graph().unwrap().to_mermaid(&Default::default())
);

assert_graphvis_snapshots!(df);

df.run_available();
assert_eq!(&[1, 3], &*output.take());

df.run_available();
assert_eq!(&[2, 6], &*output.take());

df.run_available();
assert_eq!(&[4, 12], &*output.take());

df.run_available();
assert_eq!(&[8, 24], &*output.take());
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn main() {
use hydroflow::{var_expr, var_args};
let mut df = hydroflow::scheduled::graph::Hydroflow::new();
df.__assign_meta_graph(
"{\"nodes\":[{\"value\":null,\"version\":0},{\"value\":null,\"version\":2},{\"value\":{\"Operator\":\"unique :: < 'tick > ()\"},\"version\":1},{\"value\":null,\"version\":2},{\"value\":null,\"version\":2},{\"value\":{\"Operator\":\"unique :: < 'tick > ()\"},\"version\":1},{\"value\":{\"Handoff\":{}},\"version\":3},{\"value\":{\"Operator\":\"source_stream (ints)\"},\"version\":1},{\"value\":{\"Operator\":\"for_each (| v | result . send (v) . unwrap ())\"},\"version\":1},{\"value\":{\"Operator\":\"map (| row : (_ , _ ,) | ((row . 0 % 2 ,) , (row . 1 ,)))\"},\"version\":1},{\"value\":{\"Operator\":\"fold_keyed :: < 'tick , (_ ,) , (Option < _ > ,) > (| | (None ,) , | old : & mut (Option < _ > ,) , val : (_ ,) | { old . 0 = if let Some (prev) = old . 0 . take () { Some (prev + val . 0) } else { Some (val . 0) } ; })\"},\"version\":1},{\"value\":{\"Operator\":\"map (| (g , a) : ((_ ,) , _) | (g . 0 , a . 0 . unwrap () ,))\"},\"version\":1}],\"graph\":[{\"value\":null,\"version\":0},{\"value\":[{\"idx\":7,\"version\":1},{\"idx\":2,\"version\":1}],\"version\":3},{\"value\":null,\"version\":2},{\"value\":[{\"idx\":11,\"version\":1},{\"idx\":5,\"version\":1}],\"version\":3},{\"value\":[{\"idx\":6,\"version\":3},{\"idx\":10,\"version\":1}],\"version\":3},{\"value\":null,\"version\":2},{\"value\":[{\"idx\":5,\"version\":1},{\"idx\":8,\"version\":1}],\"version\":3},{\"value\":null,\"version\":2},{\"value\":[{\"idx\":10,\"version\":1},{\"idx\":11,\"version\":1}],\"version\":1},{\"value\":[{\"idx\":9,\"version\":1},{\"idx\":6,\"version\":3}],\"version\":3},{\"value\":[{\"idx\":2,\"version\":1},{\"idx\":9,\"version\":1}],\"version\":3}],\"ports\":[{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":1},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":[\"Elided\",\"Elided\"],\"version\":3}],\"node_subgraph\":[{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":{\"idx\":2,\"version\":1},\"version\":1},{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":{\"idx\":1,\"version\":1},\"version\":1},{\"value\":null,\"version\":0},{\"value\":{\"idx\":2,\"version\":1},\"version\":1},{\"value\":{\"idx\":1,\"version\":1},\"version\":1},{\"value\":{\"idx\":2,\"version\":1},\"version\":1},{\"value\":{\"idx\":1,\"version\":1},\"version\":1},{\"value\":{\"idx\":1,\"version\":1},\"version\":1}],\"subgraph_nodes\":[{\"value\":null,\"version\":0},{\"value\":[{\"idx\":10,\"version\":1},{\"idx\":11,\"version\":1},{\"idx\":5,\"version\":1},{\"idx\":8,\"version\":1}],\"version\":1},{\"value\":[{\"idx\":7,\"version\":1},{\"idx\":2,\"version\":1},{\"idx\":9,\"version\":1}],\"version\":1}],\"subgraph_stratum\":[{\"value\":null,\"version\":0},{\"value\":1,\"version\":1},{\"value\":0,\"version\":1}],\"node_varnames\":[{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":\"ints_insert\",\"version\":1},{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":\"result_insert\",\"version\":1}],\"flow_props\":[{\"value\":null,\"version\":0}]}",
"{\"nodes\":[{\"value\":null,\"version\":0},{\"value\":null,\"version\":2},{\"value\":{\"Operator\":\"unique :: < 'tick > ()\"},\"version\":1},{\"value\":null,\"version\":2},{\"value\":null,\"version\":2},{\"value\":{\"Operator\":\"unique :: < 'tick > ()\"},\"version\":1},{\"value\":{\"Handoff\":{}},\"version\":3},{\"value\":{\"Operator\":\"source_stream (ints)\"},\"version\":1},{\"value\":{\"Operator\":\"for_each (| v | result . send (v) . unwrap ())\"},\"version\":1},{\"value\":{\"Operator\":\"map (| row : (_ , _ ,) | ((row . 0 % 2 ,) , (row . 1 ,)))\"},\"version\":1},{\"value\":{\"Operator\":\"fold_keyed :: < 'tick , (_ ,) , (Option < _ > ,) > (| | (None ,) , | old : & mut (Option < _ > ,) , val : (_ ,) | { old . 0 = if let Some (prev) = old . 0 . take () { Some (prev + val . 0) } else { Some (val . 0) } ; })\"},\"version\":1},{\"value\":{\"Operator\":\"map (| (g , a) : ((_ ,) , _) | (g . 0 , a . 0 . unwrap () ,))\"},\"version\":1}],\"graph\":[{\"value\":null,\"version\":0},{\"value\":[{\"idx\":7,\"version\":1},{\"idx\":2,\"version\":1}],\"version\":3},{\"value\":null,\"version\":2},{\"value\":[{\"idx\":11,\"version\":1},{\"idx\":5,\"version\":1}],\"version\":3},{\"value\":[{\"idx\":6,\"version\":3},{\"idx\":10,\"version\":1}],\"version\":3},{\"value\":null,\"version\":2},{\"value\":[{\"idx\":5,\"version\":1},{\"idx\":8,\"version\":1}],\"version\":3},{\"value\":null,\"version\":2},{\"value\":[{\"idx\":10,\"version\":1},{\"idx\":11,\"version\":1}],\"version\":1},{\"value\":[{\"idx\":9,\"version\":1},{\"idx\":6,\"version\":3}],\"version\":3},{\"value\":[{\"idx\":2,\"version\":1},{\"idx\":9,\"version\":1}],\"version\":3}],\"ports\":[{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":null,\"version\":0},{\"value\":[\"Elided\",\"Elided\"],\"version\":1},{\"value\":[\"Elided\",\"Elided\"],\"version\":3},{\"value\":[\"Elided\",\"Elided\"],\"version\":3}],\"node_subgraph\":[{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":{\"idx\":2,\"version\":1},\"version\":1},{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":{\"idx\":1,\"version\":1},\"version\":1},{\"value\":null,\"version\":0},{\"value\":{\"idx\":2,\"version\":1},\"version\":1},{\"value\":{\"idx\":1,\"version\":1},\"version\":1},{\"value\":{\"idx\":2,\"version\":1},\"version\":1},{\"value\":{\"idx\":1,\"version\":1},\"version\":1},{\"value\":{\"idx\":1,\"version\":1},\"version\":1}],\"subgraph_nodes\":[{\"value\":null,\"version\":0},{\"value\":[{\"idx\":10,\"version\":1},{\"idx\":11,\"version\":1},{\"idx\":5,\"version\":1},{\"idx\":8,\"version\":1}],\"version\":1},{\"value\":[{\"idx\":7,\"version\":1},{\"idx\":2,\"version\":1},{\"idx\":9,\"version\":1}],\"version\":1}],\"subgraph_stratum\":[{\"value\":null,\"version\":0},{\"value\":1,\"version\":1},{\"value\":0,\"version\":1}],\"node_varnames\":[{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":\"ints_insert\",\"version\":1},{\"value\":null,\"version\":0},{\"value\":null,\"version\":0},{\"value\":\"result_insert\",\"version\":1}],\"flow_props\":[{\"value\":null,\"version\":0}],\"subgraph_laziness\":[{\"value\":null,\"version\":0}]}",
);
df.__assign_diagnostics("[]");
let (hoff_6v3_send, hoff_6v3_recv) = df
Expand Down Expand Up @@ -46,6 +46,7 @@ fn main() {
0,
var_expr!(),
var_expr!(hoff_6v3_send),
false,
move |context, var_args!(), var_args!(hoff_6v3_send)| {
let hoff_6v3_send = hydroflow::pusherator::for_each::ForEach::new(|
v|
Expand Down Expand Up @@ -203,6 +204,7 @@ fn main() {
1,
var_expr!(hoff_6v3_recv),
var_expr!(),
false,
move |context, var_args!(hoff_6v3_recv), var_args!()| {
let mut hoff_6v3_recv = hoff_6v3_recv.borrow_mut_swap();
let hoff_6v3_recv = hoff_6v3_recv.drain(..);
Expand Down
Loading

0 comments on commit 7df0a0d

Please sign in to comment.