Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(hydroflow_lang)!: require lifetime on perist*() operators #1295

Merged
merged 3 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benches/benches/micro_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ fn ops(c: &mut Criterion) {
const DATA: [u64; 1024] = [0; 1024];

let mut df = hydroflow_syntax! {
source_iter(black_box(DATA)) -> persist()
source_iter(black_box(DATA)) -> persist::<'static>()
-> map(black_box)
-> defer_tick()
-> map(black_box)
Expand Down Expand Up @@ -307,7 +307,7 @@ fn ops(c: &mut Criterion) {
const DATA: [[u8; 8192]; 1] = [[0; 8192]; 1];

let mut df = hydroflow_syntax! {
source_iter(black_box(DATA)) -> persist()
source_iter(black_box(DATA)) -> persist::<'static>()
-> defer_tick()
-> map(black_box)
-> defer_tick()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ async fn main() {

let mut df = datalog!(
r#"
.input clientIn `source_iter([("vote".to_string(),),]) -> persist()`
.input clientIn `source_iter([("vote".to_string(),),]) -> persist::<'static>()`
.output clientOut `for_each(|(i,msg):(u32,String,)| println!("committed {:?}: {:?}", i, msg))`

# EDBs
.input startIndex `source_iter([(1u32,),]) -> persist()`
.input participants `source_iter(peers.clone()) -> map(|p| (p,)) -> persist()`
.input success `source_iter([(true,),]) -> persist()`
.input reject `source_iter([(false,),]) -> persist()`
.input commitInstruct `source_iter([(true,),]) -> persist()`
.input rollbackInstruct `source_iter([(false,),]) -> persist()`
.input startIndex `source_iter([(1u32,),]) -> persist::<'static>()`
.input participants `source_iter(peers.clone()) -> map(|p| (p,)) -> persist::<'static>()`
.input success `source_iter([(true,),]) -> persist::<'static>()`
.input reject `source_iter([(false,),]) -> persist::<'static>()`
.input commitInstruct `source_iter([(true,),]) -> persist::<'static>()`
.input rollbackInstruct `source_iter([(false,),]) -> persist::<'static>()`

.async voteToParticipant `map(|(node_id, v):(u32,(u32,String))| (node_id, serialize_to_bytes(v))) -> dest_sink(vote_to_participant_sink)` `null::<(u32,String,)>()`
.async voteFromParticipant `null::<(u32,String,bool,u32,)>()` `source_stream(vote_from_participant_source) -> map(|v| deserialize_from_bytes::<(u32,String,bool,u32,)>(v.unwrap()).unwrap())`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,24 @@ async fn main() {

let mut df = datalog!(
r#"
.input myID `source_iter(my_id.clone()) -> persist() -> map(|p| (p,))`
.input coordinator `source_iter(peers.clone()) -> persist() -> map(|p| (p,))`
.input verdict `source_iter([(true,),]) -> persist()`
.input myID `source_iter(my_id.clone()) -> persist::<'static>() -> map(|p| (p,))`
.input coordinator `source_iter(peers.clone()) -> persist::<'static>() -> map(|p| (p,))`
.input verdict `source_iter([(true,),]) -> persist::<'static>()`
// .output voteOut `for_each(|(i,myID):(u32,u32,)| println!("participant {:?}: message {:?}", myID, i))`

.async voteToParticipant `null::<(u32,String,)>()` `source_stream(vote_to_participant_source) -> map(|x| deserialize_from_bytes::<(u32,String,)>(x.unwrap()).unwrap())`
.async voteFromParticipant `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink(vote_from_participant_sink)` `null::<(u32,String,)>()`
.async instructToParticipant `null::<(u32,String,bool,)>()` `source_stream(instruct_to_participant_source) -> map(|x| deserialize_from_bytes::<(u32,String,bool,)>(x.unwrap()).unwrap())`
.async ackFromParticipant `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink(ack_from_participant_sink)` `null::<(u32,String,u32,)>()`
# .output verdictRequest

# .output verdictRequest
# .output log

# verdictRequest(i, msg) :- voteToParticipant(i, msg)
voteFromParticipant@addr(i, msg, res, l_from) :~ voteToParticipant(i, msg), coordinator(addr), myID(l_from), verdict(res)
ackFromParticipant@addr(i, msg, l_from) :~ instructToParticipant(i, msg, b), coordinator(addr), myID(l_from)
// voteOut(i, l) :- voteToParticipant(i, msg), myID(l)

# log(i, msg, type) :- instructToParticipant(i, msg, type) # the log channel will sort everything out
"#
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main() {
r#"
.input repeated `spin() -> flat_map(|_| to_repeat.iter().cloned())`
.input periodic `source_stream(periodic) -> map(|_| ())`
.input peers `source_iter(peers.clone()) -> persist() -> map(|p| (p,))`
.input peers `source_iter(peers.clone()) -> persist::<'static>() -> map(|p| (p,))`
.async broadcast `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink(broadcast_sink)` `null::<(String,)>()`
broadcast@n(x) :~ repeated(x), periodic(), peers(n)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ async fn main() {

let mut df = datalog!(
r#"
.input clientIn `source_iter([("vote".to_string(),),]) -> persist()`
.input clientIn `source_iter([("vote".to_string(),),]) -> persist::<'static>()`
.output stdout `for_each(|_:(String,)| println!("voted"))`
.input replicas `source_iter(peers.clone()) -> persist() -> map(|p| (p,))`
.input replicas `source_iter(peers.clone()) -> persist::<'static>() -> map(|p| (p,))`
.async voteToReplica `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink(to_replica_sink)` `null::<(String,)>()`
.async voteFromReplica `null::<(u32,String,)>()` `source_stream(from_replica_source) -> map(|v| deserialize_from_bytes::<(u32,String,)>(v.unwrap()).unwrap())`
voteToReplica@addr(v) :~ clientIn(v), replicas(addr)
allVotes(s, v) :- voteFromReplica(s, v)
allVotes(s, v) :+ allVotes(s, v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ async fn main() {

let mut df = datalog!(
r#"
.input myID `source_iter(my_id.clone()) -> persist() -> map(|p| (p,))`
.input leader `source_iter(peers.clone()) -> persist() -> map(|p| (p,))`
.input myID `source_iter(my_id.clone()) -> persist::<'static>() -> map(|p| (p,))`
.input leader `source_iter(peers.clone()) -> persist::<'static>() -> map(|p| (p,))`
.async voteToReplica `null::<(String,)>()` `source_stream(to_replica_source) -> map(|x| deserialize_from_bytes::<(String,)>(x.unwrap()).unwrap())`
.async voteFromReplica `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink(from_replica_sink)` `null::<(u32,String,)>()`
voteFromReplica@addr(i, v) :~ voteToReplica(v), leader(addr), myID(i)
"#
);
Expand Down
22 changes: 11 additions & 11 deletions hydro_deploy/hydro_cli_examples/examples/ws_chat_server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() {
util::ws_server(ws_port).await;

let df = hydroflow_syntax! {
all_peers = source_iter((0..number_of_nodes).filter(move |&i| i != self_node_id)) -> persist();
all_peers = source_iter((0..number_of_nodes).filter(move |&i| i != self_node_id)) -> persist::<'static>();

// networking
from_peer = source_stream(from_peer) -> map(|b| deserialize_from_bytes::<PeerMessage>(b.unwrap()).unwrap());
Expand All @@ -68,16 +68,16 @@ async fn main() {

// helpers
peer_broadcast = cross_join::<'tick, 'tick, HalfMultisetJoinState>() -> to_peer;
all_peers -> [0] peer_broadcast;
to_peers = [1] peer_broadcast;
all_peers -> [0]peer_broadcast;
to_peers = [1]peer_broadcast;

names = from_client ->
filter_map(|(client, msg)| if let FromClient::Name(name) = msg { Some((client, name)) } else { None });
messages = from_client ->
filter_map(|(client, msg)| if let FromClient::Message { id, text } = msg { Some((client, (id, text))) } else { None });

clients_connect -> persist() -> [pos] active_clients;
clients_disconnect -> persist() -> [neg] active_clients;
clients_connect -> persist::<'static>() -> [pos]active_clients;
clients_disconnect -> persist::<'static>() -> [neg]active_clients;
active_clients = difference() -> null();

// logic
Expand All @@ -88,8 +88,8 @@ async fn main() {
// })) -> to_client;

// replicated chat
messages -> [0] local_messages;
names -> persist() -> [1] local_messages;
messages -> [0]local_messages;
names -> persist::<'static>() -> [1]local_messages;
local_messages = join::<'tick, 'tick, HalfMultisetJoinState>() -> tee();

local_messages -> map(|(client_id, ((msg_id, text), name))| (ChatMessage {
Expand All @@ -108,12 +108,12 @@ async fn main() {

from_peer -> map(|p| (p.msg, p.node_id, p.client_id, p.msg_id)) -> all_messages;

all_messages = union() /* -> persist() -> (PATCH 2) */ -> unique::<'tick>() -> map(|t| t.0);
all_messages = union() /* -> persist::<'static>() -> (PATCH 2) */ -> unique::<'tick>() -> map(|t| t.0);

broadcast_clients = cross_join::<'static /*'tick (PATCH 1) */, 'static /*'tick, HalfMultisetJoinState (PATCH 2) */>() -> multiset_delta() -> to_client;
// active_clients -> [0] broadcast_clients; (PATCH 1)
clients_connect -> [0] broadcast_clients;
all_messages -> [1] broadcast_clients;
// active_clients -> [0]broadcast_clients; (PATCH 1)
clients_connect -> [0]broadcast_clients;
all_messages -> [1]broadcast_clients;
};

hydroflow::util::cli::launch_flow(df).await;
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/chat/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub(crate) async fn run_client(opts: Opts) {
message: l.unwrap(),
ts: Utc::now()})
-> [input]msg_send;
inbound_chan[ConnectResponse] -> persist() -> [signal]msg_send;
inbound_chan[ConnectResponse] -> persist::<'static>() -> [signal]msg_send;
msg_send = defer_signal() -> map(|msg| (msg, server_addr)) -> [1]outbound_chan;

// receive and print messages
Expand Down
4 changes: 2 additions & 2 deletions hydroflow/examples/deadlock_detector/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ pub(crate) async fn run_detector(opts: Opts, peer_list: Vec<String>) {
// setup gossip channel to all peers. gen_bool chooses True with the odds passed in.
gossip_join = cross_join::<'tick>()
-> filter(|_| gen_bool(0.8)) -> outbound_chan;
gossip = map(identity) -> persist() -> [0]gossip_join;
peers[1] -> persist() -> [1]gossip_join;
gossip = map(identity) -> persist::<'static>() -> [0]gossip_join;
peers[1] -> persist::<'static>() -> [1]gossip_join;
peers[2] -> for_each(|s| println!("Peer: {:?}", s));

// prompt for input
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/kvs_mut/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts
],
}
})
-> persist_mut_keyed()
-> persist_mut_keyed::<'static>()
-> [0]lookup;
gets -> [1]lookup;
// Join PUTs and GETs by key, persisting the PUTs.
Expand Down
4 changes: 2 additions & 2 deletions hydroflow/examples/kvs_replicated/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts

// Join PUTs and GETs by key
writes -> map(|(key, value, _addr)| (key, value)) -> writes_store;
writes_store = persist() -> tee();
writes_store = persist::<'static>() -> tee();
writes_store -> [0]lookup;
gets -> [1]lookup;
lookup = join();
Expand All @@ -52,7 +52,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts
// Outbound gossip. Send updates to peers.
peers -> peer_store;
source_iter_delta(peer_server) -> peer_store;
peer_store = union() -> persist();
peer_store = union() -> persist::<'static>();
writes -> [0]outbound_gossip;
peer_store -> [1]outbound_gossip;
outbound_gossip = cross_join()
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/two_pc/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(crate) async fn run_coordinator(outbound: UdpSink, inbound: UdpStream, opts:
// 2. coordinator send final decision, subordinates ACK
// 3. coordinate sends END, subordinates respond with ENDED
// After phase 3 we delete the xid from the phase_map
phase_map = union() -> persist_mut_keyed();
phase_map = union() -> persist_mut_keyed::<'static>();

// set up channels
outbound_chan = tee();
Expand Down
6 changes: 5 additions & 1 deletion hydroflow/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,11 @@ impl<'a> Hydroflow<'a> {

let mut op_inst_diagnostics = Vec::new();
meta_graph.insert_node_op_insts_all(&mut op_inst_diagnostics);
assert!(op_inst_diagnostics.is_empty());
assert!(
op_inst_diagnostics.is_empty(),
"Expected no diagnostics, got: {:#?}",
op_inst_diagnostics
);

assert!(self.meta_graph.replace(meta_graph).is_none());
}
Expand Down
13 changes: 4 additions & 9 deletions hydroflow/tests/compile-fail/surface_singleton_badexpr.stderr
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
error[E0308]: mismatched types
--> tests/compile-fail/surface_singleton_badexpr.rs:7:40
|
7 | -> filter(|value| value <= #my_ref)
| ^^^^^^^ expected `&u32`, found integer
|
help: consider removing deref here
|
7 | -> filter(|value| value <= #my_ref)
error: `persist` should have exactly 1 persistence lifetime arguments, actually has 0.
--> tests/compile-fail/surface_singleton_badexpr.rs:6:16
|
6 | -> persist()
| ^^^^^^^
2 changes: 1 addition & 1 deletion hydroflow/tests/compile-fail/surface_singleton_nostate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub fn main() {
let mut df = hydroflow::hydroflow_syntax! {
my_ref = source_iter(15..=25) -> null();
source_iter(10..=30)
-> persist()
-> persist::<'static>()
-> filter(|value| value <= #my_ref.as_reveal_ref())
-> null();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub fn main() {
let mut df = hydroflow::hydroflow_syntax! {
my_ref = source_iter(15..=25) -> null();
source_iter(10..=30)
-> persist()
-> persist::<'static>()
-> filter(|value| value <= #my_ref.as_reveal_ref() && value <= #unknown.as_reveal_ref())
-> null();
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub fn main() {
let mut df = hydroflow::hydroflow_syntax! {
source_iter(10..=30)
-> persist()
-> persist::<'static>()
-> filter(|value| value <= #unknown.as_reveal_ref())
-> null();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub fn main() {
let mut df = hydroflow::hydroflow_syntax! {
my_ref = source_iter(15..=25) -> null();
source_iter(10..=30)
-> persist()
-> persist::<'static>()
-> filter(|value| value <= #unknown.as_reveal_ref() && value <= #my_ref.as_reveal_ref())
-> null();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_iter([()])", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) persist()", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) persist::<'static>()", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) for_each(|_| time.set(Some(context.current_tick_start().elapsed().unwrap())))", shape=house, fillcolor="#ffff88"]
n2v1 -> n3v1
n1v1 -> n2v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter([()])</code>"/]:::pullClass
2v1[\"(2v1) <code>persist()</code>"/]:::pullClass
2v1[\"(2v1) <code>persist::&lt;'static&gt;()</code>"/]:::pullClass
3v1[/"(3v1) <code>for_each(|_| time.set(Some(context.current_tick_start().elapsed().unwrap())))</code>"\]:::pushClass
2v1-->3v1
1v1-->2v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_iter([1, 2, 3])", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) persist()", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) persist::<'static>()", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) tee()", shape=house, fillcolor="#ffff88"]
n4v1 [label="(n4v1) for_each(|v| pull_tx.send(v).unwrap())", shape=house, fillcolor="#ffff88"]
n5v1 [label="(n5v1) persist()", shape=house, fillcolor="#ffff88"]
n5v1 [label="(n5v1) persist::<'static>()", shape=house, fillcolor="#ffff88"]
n6v1 [label="(n6v1) for_each(|v| push_tx.send(v).unwrap())", shape=house, fillcolor="#ffff88"]
n2v1 -> n3v1
n1v1 -> n2v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter([1, 2, 3])</code>"/]:::pullClass
2v1[\"(2v1) <code>persist()</code>"/]:::pullClass
2v1[\"(2v1) <code>persist::&lt;'static&gt;()</code>"/]:::pullClass
3v1[/"(3v1) <code>tee()</code>"\]:::pushClass
4v1[/"(4v1) <code>for_each(|v| pull_tx.send(v).unwrap())</code>"\]:::pushClass
5v1[/"(5v1) <code>persist()</code>"\]:::pushClass
5v1[/"(5v1) <code>persist::&lt;'static&gt;()</code>"\]:::pushClass
6v1[/"(6v1) <code>for_each(|v| push_tx.send(v).unwrap())</code>"\]:::pushClass
2v1-->3v1
1v1-->2v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ digraph {
node [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace", style=filled];
edge [fontname="Monaco,Menlo,Consolas,&quot;Droid Sans Mono&quot;,Inconsolata,&quot;Courier New&quot;,monospace"];
n1v1 [label="(n1v1) source_iter([1])", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) persist()", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) persist()", shape=invhouse, fillcolor="#88aaff"]
n2v1 [label="(n2v1) persist::<'static>()", shape=invhouse, fillcolor="#88aaff"]
n3v1 [label="(n3v1) persist::<'static>()", shape=invhouse, fillcolor="#88aaff"]
n4v1 [label="(n4v1) fold(|| 0, |a: &mut _, b| *a += b)", shape=invhouse, fillcolor="#88aaff"]
n5v1 [label="(n5v1) for_each(|x| result_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"]
n6v1 [label="(n6v1) handoff", shape=parallelogram, fillcolor="#ddddff"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa
1v1[\"(1v1) <code>source_iter([1])</code>"/]:::pullClass
2v1[\"(2v1) <code>persist()</code>"/]:::pullClass
3v1[\"(3v1) <code>persist()</code>"/]:::pullClass
2v1[\"(2v1) <code>persist::&lt;'static&gt;()</code>"/]:::pullClass
3v1[\"(3v1) <code>persist::&lt;'static&gt;()</code>"/]:::pullClass
4v1[\"(4v1) <code>fold(|| 0, |a: &amp;mut _, b| *a += b)</code>"/]:::pullClass
5v1[/"(5v1) <code>for_each(|x| result_send.send(x).unwrap())</code>"\]:::pushClass
6v1["(6v1) <code>handoff</code>"]:::otherClass
Expand Down
Loading
Loading