Skip to content

Commit

Permalink
refactor(hydroflow_lang)!: require lifetime on perist*() operators (#…
Browse files Browse the repository at this point in the history
…1295)

BREAKING CHANGE: `persist()` is no longer valid, use `persist::<'static>()` instead

Future PR will add `persist::<'tick>()`
  • Loading branch information
MingweiSamuel authored and jhellerstein committed Jul 16, 2024
1 parent 7a1126c commit 482d57e
Show file tree
Hide file tree
Showing 97 changed files with 240 additions and 230 deletions.
4 changes: 2 additions & 2 deletions benches/benches/micro_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ fn ops(c: &mut Criterion) {
const DATA: [u64; 1024] = [0; 1024];

let mut df = hydroflow_syntax! {
source_iter(black_box(DATA)) -> persist()
source_iter(black_box(DATA)) -> persist::<'static>()
-> map(black_box)
-> defer_tick()
-> map(black_box)
Expand Down Expand Up @@ -307,7 +307,7 @@ fn ops(c: &mut Criterion) {
const DATA: [[u8; 8192]; 1] = [[0; 8192]; 1];

let mut df = hydroflow_syntax! {
source_iter(black_box(DATA)) -> persist()
source_iter(black_box(DATA)) -> persist::<'static>()
-> defer_tick()
-> map(black_box)
-> defer_tick()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ async fn main() {

let mut df = datalog!(
r#"
.input clientIn `source_iter([("vote".to_string(),),]) -> persist()`
.input clientIn `source_iter([("vote".to_string(),),]) -> persist::<'static>()`
.output clientOut `for_each(|(i,msg):(u32,String,)| println!("committed {:?}: {:?}", i, msg))`
# EDBs
.input startIndex `source_iter([(1u32,),]) -> persist()`
.input participants `source_iter(peers.clone()) -> map(|p| (p,)) -> persist()`
.input success `source_iter([(true,),]) -> persist()`
.input reject `source_iter([(false,),]) -> persist()`
.input commitInstruct `source_iter([(true,),]) -> persist()`
.input rollbackInstruct `source_iter([(false,),]) -> persist()`
.input startIndex `source_iter([(1u32,),]) -> persist::<'static>()`
.input participants `source_iter(peers.clone()) -> map(|p| (p,)) -> persist::<'static>()`
.input success `source_iter([(true,),]) -> persist::<'static>()`
.input reject `source_iter([(false,),]) -> persist::<'static>()`
.input commitInstruct `source_iter([(true,),]) -> persist::<'static>()`
.input rollbackInstruct `source_iter([(false,),]) -> persist::<'static>()`
.async voteToParticipant `map(|(node_id, v):(u32,(u32,String))| (node_id, serialize_to_bytes(v))) -> dest_sink(vote_to_participant_sink)` `null::<(u32,String,)>()`
.async voteFromParticipant `null::<(u32,String,bool,u32,)>()` `source_stream(vote_from_participant_source) -> map(|v| deserialize_from_bytes::<(u32,String,bool,u32,)>(v.unwrap()).unwrap())`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,24 @@ async fn main() {

let mut df = datalog!(
r#"
.input myID `source_iter(my_id.clone()) -> persist() -> map(|p| (p,))`
.input coordinator `source_iter(peers.clone()) -> persist() -> map(|p| (p,))`
.input verdict `source_iter([(true,),]) -> persist()`
.input myID `source_iter(my_id.clone()) -> persist::<'static>() -> map(|p| (p,))`
.input coordinator `source_iter(peers.clone()) -> persist::<'static>() -> map(|p| (p,))`
.input verdict `source_iter([(true,),]) -> persist::<'static>()`
// .output voteOut `for_each(|(i,myID):(u32,u32,)| println!("participant {:?}: message {:?}", myID, i))`
.async voteToParticipant `null::<(u32,String,)>()` `source_stream(vote_to_participant_source) -> map(|x| deserialize_from_bytes::<(u32,String,)>(x.unwrap()).unwrap())`
.async voteFromParticipant `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink(vote_from_participant_sink)` `null::<(u32,String,)>()`
.async instructToParticipant `null::<(u32,String,bool,)>()` `source_stream(instruct_to_participant_source) -> map(|x| deserialize_from_bytes::<(u32,String,bool,)>(x.unwrap()).unwrap())`
.async ackFromParticipant `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink(ack_from_participant_sink)` `null::<(u32,String,u32,)>()`
# .output verdictRequest
# .output verdictRequest
# .output log
# verdictRequest(i, msg) :- voteToParticipant(i, msg)
voteFromParticipant@addr(i, msg, res, l_from) :~ voteToParticipant(i, msg), coordinator(addr), myID(l_from), verdict(res)
ackFromParticipant@addr(i, msg, l_from) :~ instructToParticipant(i, msg, b), coordinator(addr), myID(l_from)
// voteOut(i, l) :- voteToParticipant(i, msg), myID(l)
# log(i, msg, type) :- instructToParticipant(i, msg, type) # the log channel will sort everything out
"#
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main() {
r#"
.input repeated `spin() -> flat_map(|_| to_repeat.iter().cloned())`
.input periodic `source_stream(periodic) -> map(|_| ())`
.input peers `source_iter(peers.clone()) -> persist() -> map(|p| (p,))`
.input peers `source_iter(peers.clone()) -> persist::<'static>() -> map(|p| (p,))`
.async broadcast `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink(broadcast_sink)` `null::<(String,)>()`
broadcast@n(x) :~ repeated(x), periodic(), peers(n)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ async fn main() {

let mut df = datalog!(
r#"
.input clientIn `source_iter([("vote".to_string(),),]) -> persist()`
.input clientIn `source_iter([("vote".to_string(),),]) -> persist::<'static>()`
.output stdout `for_each(|_:(String,)| println!("voted"))`
.input replicas `source_iter(peers.clone()) -> persist() -> map(|p| (p,))`
.input replicas `source_iter(peers.clone()) -> persist::<'static>() -> map(|p| (p,))`
.async voteToReplica `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink(to_replica_sink)` `null::<(String,)>()`
.async voteFromReplica `null::<(u32,String,)>()` `source_stream(from_replica_source) -> map(|v| deserialize_from_bytes::<(u32,String,)>(v.unwrap()).unwrap())`
voteToReplica@addr(v) :~ clientIn(v), replicas(addr)
allVotes(s, v) :- voteFromReplica(s, v)
allVotes(s, v) :+ allVotes(s, v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ async fn main() {

let mut df = datalog!(
r#"
.input myID `source_iter(my_id.clone()) -> persist() -> map(|p| (p,))`
.input leader `source_iter(peers.clone()) -> persist() -> map(|p| (p,))`
.input myID `source_iter(my_id.clone()) -> persist::<'static>() -> map(|p| (p,))`
.input leader `source_iter(peers.clone()) -> persist::<'static>() -> map(|p| (p,))`
.async voteToReplica `null::<(String,)>()` `source_stream(to_replica_source) -> map(|x| deserialize_from_bytes::<(String,)>(x.unwrap()).unwrap())`
.async voteFromReplica `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink(from_replica_sink)` `null::<(u32,String,)>()`
voteFromReplica@addr(i, v) :~ voteToReplica(v), leader(addr), myID(i)
"#
);
Expand Down
22 changes: 11 additions & 11 deletions hydro_deploy/hydro_cli_examples/examples/ws_chat_server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() {
util::ws_server(ws_port).await;

let df = hydroflow_syntax! {
all_peers = source_iter((0..number_of_nodes).filter(move |&i| i != self_node_id)) -> persist();
all_peers = source_iter((0..number_of_nodes).filter(move |&i| i != self_node_id)) -> persist::<'static>();

// networking
from_peer = source_stream(from_peer) -> map(|b| deserialize_from_bytes::<PeerMessage>(b.unwrap()).unwrap());
Expand All @@ -68,16 +68,16 @@ async fn main() {

// helpers
peer_broadcast = cross_join::<'tick, 'tick, HalfMultisetJoinState>() -> to_peer;
all_peers -> [0] peer_broadcast;
to_peers = [1] peer_broadcast;
all_peers -> [0]peer_broadcast;
to_peers = [1]peer_broadcast;

names = from_client ->
filter_map(|(client, msg)| if let FromClient::Name(name) = msg { Some((client, name)) } else { None });
messages = from_client ->
filter_map(|(client, msg)| if let FromClient::Message { id, text } = msg { Some((client, (id, text))) } else { None });

clients_connect -> persist() -> [pos] active_clients;
clients_disconnect -> persist() -> [neg] active_clients;
clients_connect -> persist::<'static>() -> [pos]active_clients;
clients_disconnect -> persist::<'static>() -> [neg]active_clients;
active_clients = difference() -> null();

// logic
Expand All @@ -88,8 +88,8 @@ async fn main() {
// })) -> to_client;

// replicated chat
messages -> [0] local_messages;
names -> persist() -> [1] local_messages;
messages -> [0]local_messages;
names -> persist::<'static>() -> [1]local_messages;
local_messages = join::<'tick, 'tick, HalfMultisetJoinState>() -> tee();

local_messages -> map(|(client_id, ((msg_id, text), name))| (ChatMessage {
Expand All @@ -108,12 +108,12 @@ async fn main() {

from_peer -> map(|p| (p.msg, p.node_id, p.client_id, p.msg_id)) -> all_messages;

all_messages = union() /* -> persist() -> (PATCH 2) */ -> unique::<'tick>() -> map(|t| t.0);
all_messages = union() /* -> persist::<'static>() -> (PATCH 2) */ -> unique::<'tick>() -> map(|t| t.0);

broadcast_clients = cross_join::<'static /*'tick (PATCH 1) */, 'static /*'tick, HalfMultisetJoinState (PATCH 2) */>() -> multiset_delta() -> to_client;
// active_clients -> [0] broadcast_clients; (PATCH 1)
clients_connect -> [0] broadcast_clients;
all_messages -> [1] broadcast_clients;
// active_clients -> [0]broadcast_clients; (PATCH 1)
clients_connect -> [0]broadcast_clients;
all_messages -> [1]broadcast_clients;
};

hydroflow::util::cli::launch_flow(df).await;
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/chat/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub(crate) async fn run_client(opts: Opts) {
message: l.unwrap(),
ts: Utc::now()})
-> [input]msg_send;
inbound_chan[ConnectResponse] -> persist() -> [signal]msg_send;
inbound_chan[ConnectResponse] -> persist::<'static>() -> [signal]msg_send;
msg_send = defer_signal() -> map(|msg| (msg, server_addr)) -> [1]outbound_chan;

// receive and print messages
Expand Down
4 changes: 2 additions & 2 deletions hydroflow/examples/deadlock_detector/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ pub(crate) async fn run_detector(opts: Opts, peer_list: Vec<String>) {
// setup gossip channel to all peers. gen_bool chooses True with the odds passed in.
gossip_join = cross_join::<'tick>()
-> filter(|_| gen_bool(0.8)) -> outbound_chan;
gossip = map(identity) -> persist() -> [0]gossip_join;
peers[1] -> persist() -> [1]gossip_join;
gossip = map(identity) -> persist::<'static>() -> [0]gossip_join;
peers[1] -> persist::<'static>() -> [1]gossip_join;
peers[2] -> for_each(|s| println!("Peer: {:?}", s));

// prompt for input
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/kvs_mut/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts
],
}
})
-> persist_mut_keyed()
-> persist_mut_keyed::<'static>()
-> [0]lookup;
gets -> [1]lookup;
// Join PUTs and GETs by key, persisting the PUTs.
Expand Down
4 changes: 2 additions & 2 deletions hydroflow/examples/kvs_replicated/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts

// Join PUTs and GETs by key
writes -> map(|(key, value, _addr)| (key, value)) -> writes_store;
writes_store = persist() -> tee();
writes_store = persist::<'static>() -> tee();
writes_store -> [0]lookup;
gets -> [1]lookup;
lookup = join();
Expand All @@ -52,7 +52,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts
// Outbound gossip. Send updates to peers.
peers -> peer_store;
source_iter_delta(peer_server) -> peer_store;
peer_store = union() -> persist();
peer_store = union() -> persist::<'static>();
writes -> [0]outbound_gossip;
peer_store -> [1]outbound_gossip;
outbound_gossip = cross_join()
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/two_pc/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(crate) async fn run_coordinator(outbound: UdpSink, inbound: UdpStream, opts:
// 2. coordinator send final decision, subordinates ACK
// 3. coordinate sends END, subordinates respond with ENDED
// After phase 3 we delete the xid from the phase_map
phase_map = union() -> persist_mut_keyed();
phase_map = union() -> persist_mut_keyed::<'static>();

// set up channels
outbound_chan = tee();
Expand Down
6 changes: 5 additions & 1 deletion hydroflow/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,11 @@ impl<'a> Hydroflow<'a> {

let mut op_inst_diagnostics = Vec::new();
meta_graph.insert_node_op_insts_all(&mut op_inst_diagnostics);
assert!(op_inst_diagnostics.is_empty());
assert!(
op_inst_diagnostics.is_empty(),
"Expected no diagnostics, got: {:#?}",
op_inst_diagnostics
);

assert!(self.meta_graph.replace(meta_graph).is_none());
}
Expand Down
13 changes: 4 additions & 9 deletions hydroflow/tests/compile-fail/surface_singleton_badexpr.stderr
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
error[E0308]: mismatched types
--> tests/compile-fail/surface_singleton_badexpr.rs:7:40
|
7 | -> filter(|value| value <= #my_ref)
| ^^^^^^^ expected `&u32`, found integer
|
help: consider removing deref here
|
7 | -> filter(|value| value <= #my_ref)
error: `persist` should have exactly 1 persistence lifetime arguments, actually has 0.
--> tests/compile-fail/surface_singleton_badexpr.rs:6:16
|
6 | -> persist()
| ^^^^^^^
2 changes: 1 addition & 1 deletion hydroflow/tests/compile-fail/surface_singleton_nostate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub fn main() {
let mut df = hydroflow::hydroflow_syntax! {
my_ref = source_iter(15..=25) -> null();
source_iter(10..=30)
-> persist()
-> persist::<'static>()
-> filter(|value| value <= #my_ref.as_reveal_ref())
-> null();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub fn main() {
let mut df = hydroflow::hydroflow_syntax! {
my_ref = source_iter(15..=25) -> null();
source_iter(10..=30)
-> persist()
-> persist::<'static>()
-> filter(|value| value <= #my_ref.as_reveal_ref() && value <= #unknown.as_reveal_ref())
-> null();
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub fn main() {
let mut df = hydroflow::hydroflow_syntax! {
source_iter(10..=30)
-> persist()
-> persist::<'static>()
-> filter(|value| value <= #unknown.as_reveal_ref())
-> null();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub fn main() {
let mut df = hydroflow::hydroflow_syntax! {
my_ref = source_iter(15..=25) -> null();
source_iter(10..=30)
-> persist()
-> persist::<'static>()
-> filter(|value| value <= #unknown.as_reveal_ref() && value <= #my_ref.as_reveal_ref())
-> null();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_iter([()])", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) persist()", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) persist::<'static>()", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) for_each(|_| time.set(Some(context.current_tick_start().elapsed().unwrap())))", shape=house, fillcolor="#ffff88"]
n2v1 -> n3v1
n1v1 -> n2v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter([()])</code>"/]:::pullClass
2v1[\"(2v1) <code>persist()</code>"/]:::pullClass
2v1[\"(2v1) <code>persist::&lt;'static&gt;()</code>"/]:::pullClass
3v1[/"(3v1) <code>for_each(|_| time.set(Some(context.current_tick_start().elapsed().unwrap())))</code>"\]:::pushClass
2v1-->3v1
1v1-->2v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_iter([1, 2, 3])", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) persist()", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) persist::<'static>()", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) tee()", shape=house, fillcolor="#ffff88"]
n4v1 [label="(n4v1) for_each(|v| pull_tx.send(v).unwrap())", shape=house, fillcolor="#ffff88"]
n5v1 [label="(n5v1) persist()", shape=house, fillcolor="#ffff88"]
n5v1 [label="(n5v1) persist::<'static>()", shape=house, fillcolor="#ffff88"]
n6v1 [label="(n6v1) for_each(|v| push_tx.send(v).unwrap())", shape=house, fillcolor="#ffff88"]
n2v1 -> n3v1
n1v1 -> n2v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter([1, 2, 3])</code>"/]:::pullClass
2v1[\"(2v1) <code>persist()</code>"/]:::pullClass
2v1[\"(2v1) <code>persist::&lt;'static&gt;()</code>"/]:::pullClass
3v1[/"(3v1) <code>tee()</code>"\]:::pushClass
4v1[/"(4v1) <code>for_each(|v| pull_tx.send(v).unwrap())</code>"\]:::pushClass
5v1[/"(5v1) <code>persist()</code>"\]:::pushClass
5v1[/"(5v1) <code>persist::&lt;'static&gt;()</code>"\]:::pushClass
6v1[/"(6v1) <code>for_each(|v| push_tx.send(v).unwrap())</code>"\]:::pushClass
2v1-->3v1
1v1-->2v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_iter([1])", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) persist()", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) persist()", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) persist::<'static>()", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) persist::<'static>()", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) fold(|| 0, |a: &mut _, b| *a += b)", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) for_each(|x| result_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n6v1 [label="(n6v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter([1])</code>"/]:::pullClass
2v1[\"(2v1) <code>persist()</code>"/]:::pullClass
3v1[\"(3v1) <code>persist()</code>"/]:::pullClass
2v1[\"(2v1) <code>persist::&lt;'static&gt;()</code>"/]:::pullClass
3v1[\"(3v1) <code>persist::&lt;'static&gt;()</code>"/]:::pullClass
4v1[\"(4v1) <code>fold(|| 0, |a: &amp;mut _, b| *a += b)</code>"/]:::pullClass
5v1[/"(5v1) <code>for_each(|x| result_send.send(x).unwrap())</code>"\]:::pushClass
6v1["(6v1) <code>handoff</code>"]:::otherClass
Expand Down
Loading

0 comments on commit 482d57e

Please sign in to comment.