Skip to content

Commit

Permalink
fix(hydroflow_lang)!: remove singleton persist insertion
Browse files Browse the repository at this point in the history
Also enables singletons for `persist()` and ensures that only the
`'static` lifetime is used
  • Loading branch information
MingweiSamuel committed Jul 3, 2024
1 parent 67c0e51 commit 7bbe654
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 357 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
source: hydroflow/tests/surface_singleton.rs
expression: "df.meta_graph().unwrap().to_dot(&Default::default())"
expression: "df.meta_graph().unwrap().to_dot(& Default :: default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,"Droid Sans Mono",Inconsolata,"Courier New",monospace", style=filled];
Expand All @@ -12,28 +12,26 @@ digraph {
n5v1 [label="(n5v1) tee()", shape=house, fillcolor="#ffff88"]
n6v1 [label="(n6v1) identity::<Max<_>>()", shape=invhouse, fillcolor="#88aaff"]
n7v1 [label="(n7v1) cross_join()", shape=invhouse, fillcolor="#88aaff"]
n8v1 [label="(n8v1) persist::<'static>()", shape=invhouse, fillcolor="#88aaff"]
n9v1 [label="(n9v1) filter(|(value, max_of_stream2)| { value <= max_of_stream2.as_reveal_ref() })", shape=invhouse, fillcolor="#88aaff"]
n10v1 [label="(n10v1) map(|(x, _max)| (context.current_tick(), x))", shape=invhouse, fillcolor="#88aaff"]
n11v1 [label="(n11v1) for_each(|x| filter_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n12v1 [label="(n12v1) map(|x: Max<_>| (context.current_tick(), x.into_reveal()))", shape=house, fillcolor="#ffff88"]
n13v1 [label="(n13v1) for_each(|x| max_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n8v1 [label="(n8v1) filter(|(value, max_of_stream2)| { value <= max_of_stream2.as_reveal_ref() })", shape=invhouse, fillcolor="#88aaff"]
n9v1 [label="(n9v1) map(|(x, _max)| (context.current_tick(), x))", shape=invhouse, fillcolor="#88aaff"]
n10v1 [label="(n10v1) for_each(|x| filter_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n11v1 [label="(n11v1) map(|x: Max<_>| (context.current_tick(), x.into_reveal()))", shape=house, fillcolor="#ffff88"]
n12v1 [label="(n12v1) for_each(|x| max_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n13v1 [label="(n13v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n14v1 [label="(n14v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n15v1 [label="(n15v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n2v1 -> n3v1 [color=darkgreen, style=dashed]
n4v1 -> n5v1 [color=darkgreen, style=bold]
n3v1 -> n14v1 [color=darkgreen, style=dashed]
n3v1 -> n13v1 [color=darkgreen, style=dashed]
n1v1 -> n7v1 [label="0"]
n6v1 -> n7v1 [label="1"]
n5v1 -> n15v1 [color=darkgreen, style=bold]
n10v1 -> n11v1
n5v1 -> n14v1 [color=darkgreen, style=bold]
n9v1 -> n10v1
n8v1 -> n9v1
n7v1 -> n8v1
n12v1 -> n13v1 [color=darkgreen, style=bold]
n5v1 -> n12v1 [color=darkgreen, style=bold]
n14v1 -> n4v1 [color=red, style=dashed]
n15v1 -> n6v1 [color=darkgreen, style=bold]
n11v1 -> n12v1 [color=darkgreen, style=bold]
n5v1 -> n11v1 [color=darkgreen, style=bold]
n13v1 -> n4v1 [color=red, style=dashed]
n14v1 -> n6v1 [color=darkgreen, style=bold]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
Expand All @@ -56,14 +54,12 @@ digraph {
n8v1
n9v1
n10v1
n11v1
subgraph "cluster_sg_2v1_var_filtered_stream2" {
label="var filtered_stream2"
n7v1
n8v1
n9v1
n10v1
n11v1
}
subgraph "cluster_sg_2v1_var_stream1" {
label="var stream1"
Expand All @@ -76,8 +72,8 @@ digraph {
label = "sg_3v1\nstratum 0"
n4v1
n5v1
n11v1
n12v1
n13v1
subgraph "cluster_sg_3v1_var_max_of_stream2" {
label="var max_of_stream2"
n4v1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
source: hydroflow/tests/surface_singleton.rs
expression: "df.meta_graph().unwrap().to_mermaid(&Default::default())"
expression: "df.meta_graph().unwrap().to_mermaid(& Default :: default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
Expand All @@ -15,28 +15,26 @@ linkStyle default stroke:#aaa
5v1[/"(5v1) <code>tee()</code>"\]:::pushClass
6v1[\"(6v1) <code>identity::&lt;Max&lt;_&gt;&gt;()</code>"/]:::pullClass
7v1[\"(7v1) <code>cross_join()</code>"/]:::pullClass
8v1[\"(8v1) <code>persist::&lt;'static&gt;()</code>"/]:::pullClass
9v1[\"(9v1) <code>filter(|(value, max_of_stream2)| { value &lt;= max_of_stream2.as_reveal_ref() })</code>"/]:::pullClass
10v1[\"(10v1) <code>map(|(x, _max)| (context.current_tick(), x))</code>"/]:::pullClass
11v1[/"(11v1) <code>for_each(|x| filter_send.send(x).unwrap())</code>"\]:::pushClass
12v1[/"(12v1) <code>map(|x: Max&lt;_&gt;| (context.current_tick(), x.into_reveal()))</code>"\]:::pushClass
13v1[/"(13v1) <code>for_each(|x| max_send.send(x).unwrap())</code>"\]:::pushClass
8v1[\"(8v1) <code>filter(|(value, max_of_stream2)| { value &lt;= max_of_stream2.as_reveal_ref() })</code>"/]:::pullClass
9v1[\"(9v1) <code>map(|(x, _max)| (context.current_tick(), x))</code>"/]:::pullClass
10v1[/"(10v1) <code>for_each(|x| filter_send.send(x).unwrap())</code>"\]:::pushClass
11v1[/"(11v1) <code>map(|x: Max&lt;_&gt;| (context.current_tick(), x.into_reveal()))</code>"\]:::pushClass
12v1[/"(12v1) <code>for_each(|x| max_send.send(x).unwrap())</code>"\]:::pushClass
13v1["(13v1) <code>handoff</code>"]:::otherClass
14v1["(14v1) <code>handoff</code>"]:::otherClass
15v1["(15v1) <code>handoff</code>"]:::otherClass
2v1-.->3v1; linkStyle 0 stroke:#060
4v1==>5v1; linkStyle 1 stroke:#060
3v1-.->14v1; linkStyle 2 stroke:#060
3v1-.->13v1; linkStyle 2 stroke:#060
1v1-->|0|7v1
6v1-->|1|7v1
5v1==>15v1; linkStyle 5 stroke:#060
10v1-->11v1
5v1==>14v1; linkStyle 5 stroke:#060
9v1-->10v1
8v1-->9v1
7v1-->8v1
12v1==>13v1; linkStyle 10 stroke:#060
5v1==>12v1; linkStyle 11 stroke:#060
14v1-.->4v1; linkStyle 12 stroke:#060
15v1==>6v1; linkStyle 13 stroke:#060
11v1==>12v1; linkStyle 9 stroke:#060
5v1==>11v1; linkStyle 10 stroke:#060
13v1-.->4v1; linkStyle 11 stroke:#060
14v1==>6v1; linkStyle 12 stroke:#060
subgraph sg_1v1 ["sg_1v1 stratum 0"]
2v1
3v1
Expand All @@ -52,13 +50,11 @@ subgraph sg_2v1 ["sg_2v1 stratum 0"]
8v1
9v1
10v1
11v1
subgraph sg_2v1_var_filtered_stream2 ["var <tt>filtered_stream2</tt>"]
7v1
8v1
9v1
10v1
11v1
end
subgraph sg_2v1_var_stream1 ["var <tt>stream1</tt>"]
1v1
Expand All @@ -67,8 +63,8 @@ end
subgraph sg_3v1 ["sg_3v1 stratum 0"]
4v1
5v1
11v1
12v1
13v1
subgraph sg_3v1_var_max_of_stream2 ["var <tt>max_of_stream2</tt>"]
4v1
5v1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
---
source: hydroflow/tests/surface_singleton.rs
expression: "df.meta_graph().unwrap().to_dot(&Default::default())"
expression: "df.meta_graph().unwrap().to_dot(& Default :: default())"
---
digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_iter(1..=10)", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) source_iter(3..=5)", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) fold(|| 0, |a, b| *a = std::cmp::max(*a, b))", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) persist::<'static>()", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) filter(|&value| { value <= max_of_stream2 })", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) map(|x| (context.current_tick(), x))", shape=invhouse, fillcolor="#88aaff"]
n7v1 [label="(n7v1) for_each(|x| filter_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n8v1 [label="(n8v1) map(|x| (context.current_tick(), x))", shape=invhouse, fillcolor="#88aaff"]
n9v1 [label="(n9v1) for_each(|x| max_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n2v1 -> n10v1
n6v1 -> n7v1
n4v1 [label="(n4v1) filter(|&value| { value <= max_of_stream2 })", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) map(|x| (context.current_tick(), x))", shape=invhouse, fillcolor="#88aaff"]
n6v1 [label="(n6v1) for_each(|x| filter_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n7v1 [label="(n7v1) map(|x| (context.current_tick(), x))", shape=invhouse, fillcolor="#88aaff"]
n8v1 [label="(n8v1) for_each(|x| max_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
n2v1 -> n9v1
n5v1 -> n6v1
n4v1 -> n5v1
n1v1 -> n4v1
n8v1 -> n9v1
n3v1 -> n8v1
n10v1 -> n3v1 [color=red]
n3v1 -> n5v1 [color=red]
n7v1 -> n8v1
n3v1 -> n7v1
n9v1 -> n3v1 [color=red]
n3v1 -> n4v1 [color=red]
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
Expand All @@ -42,13 +40,11 @@ digraph {
n4v1
n5v1
n6v1
n7v1
subgraph "cluster_sg_2v1_var_filtered_stream1" {
label="var filtered_stream1"
n4v1
n5v1
n6v1
n7v1
}
subgraph "cluster_sg_2v1_var_stream1" {
label="var stream1"
Expand All @@ -60,8 +56,8 @@ digraph {
style=filled
label = "sg_3v1\nstratum 1"
n3v1
n7v1
n8v1
n9v1
subgraph "cluster_sg_3v1_var_max_of_stream2" {
label="var max_of_stream2"
n3v1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
source: hydroflow/tests/surface_singleton.rs
expression: "df.meta_graph().unwrap().to_mermaid(&Default::default())"
expression: "df.meta_graph().unwrap().to_mermaid(& Default :: default())"
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
Expand All @@ -11,22 +11,20 @@ linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter(1..=10)</code>"/]:::pullClass
2v1[\"(2v1) <code>source_iter(3..=5)</code>"/]:::pullClass
3v1[\"(3v1) <code>fold(|| 0, |a, b| *a = std::cmp::max(*a, b))</code>"/]:::pullClass
4v1[\"(4v1) <code>persist::&lt;'static&gt;()</code>"/]:::pullClass
5v1[\"(5v1) <code>filter(|&amp;value| { value &lt;= max_of_stream2 })</code>"/]:::pullClass
6v1[\"(6v1) <code>map(|x| (context.current_tick(), x))</code>"/]:::pullClass
7v1[/"(7v1) <code>for_each(|x| filter_send.send(x).unwrap())</code>"\]:::pushClass
8v1[\"(8v1) <code>map(|x| (context.current_tick(), x))</code>"/]:::pullClass
9v1[/"(9v1) <code>for_each(|x| max_send.send(x).unwrap())</code>"\]:::pushClass
10v1["(10v1) <code>handoff</code>"]:::otherClass
2v1-->10v1
6v1-->7v1
4v1[\"(4v1) <code>filter(|&amp;value| { value &lt;= max_of_stream2 })</code>"/]:::pullClass
5v1[\"(5v1) <code>map(|x| (context.current_tick(), x))</code>"/]:::pullClass
6v1[/"(6v1) <code>for_each(|x| filter_send.send(x).unwrap())</code>"\]:::pushClass
7v1[\"(7v1) <code>map(|x| (context.current_tick(), x))</code>"/]:::pullClass
8v1[/"(8v1) <code>for_each(|x| max_send.send(x).unwrap())</code>"\]:::pushClass
9v1["(9v1) <code>handoff</code>"]:::otherClass
2v1-->9v1
5v1-->6v1
4v1-->5v1
1v1-->4v1
8v1-->9v1
3v1-->8v1
10v1--x3v1; linkStyle 7 stroke:red
3v1--x5v1; linkStyle 8 stroke:red
7v1-->8v1
3v1-->7v1
9v1--x3v1; linkStyle 6 stroke:red
3v1--x4v1; linkStyle 7 stroke:red
subgraph sg_1v1 ["sg_1v1 stratum 0"]
2v1
subgraph sg_1v1_var_stream2 ["var <tt>stream2</tt>"]
Expand All @@ -38,21 +36,19 @@ subgraph sg_2v1 ["sg_2v1 stratum 2"]
4v1
5v1
6v1
7v1
subgraph sg_2v1_var_filtered_stream1 ["var <tt>filtered_stream1</tt>"]
4v1
5v1
6v1
7v1
end
subgraph sg_2v1_var_stream1 ["var <tt>stream1</tt>"]
1v1
end
end
subgraph sg_3v1 ["sg_3v1 stratum 1"]
3v1
7v1
8v1
9v1
subgraph sg_3v1_var_max_of_stream2 ["var <tt>max_of_stream2</tt>"]
3v1
end
Expand Down

This file was deleted.

Loading

0 comments on commit 7bbe654

Please sign in to comment.