Skip to content

Commit

Permalink
fix: chat and two_pc no longer replay (#967)
Browse files Browse the repository at this point in the history
  • Loading branch information
jhellerstein authored Nov 24, 2023
1 parent 8b63568 commit 0539e2a
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 14 deletions.
2 changes: 1 addition & 1 deletion hydroflow/examples/chat/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts
// Pipeline 2: Broadcast messages to all clients
inbound_chan[ChatMsg] -> map(|(_addr, nickname, message, ts)| Message::ChatMsg { nickname, message, ts }) -> [0]broadcast;
clients[1] -> [1]broadcast;
broadcast = cross_join::<'static>() -> [1]outbound_chan;
broadcast = cross_join::<'tick, 'static>() -> [1]outbound_chan;
};

if let Some(graph) = opts.graph {
Expand Down
69 changes: 59 additions & 10 deletions hydroflow/examples/two_pc/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ pub(crate) async fn run_coordinator(outbound: UdpSink, inbound: UdpStream, opts:
-> map(|s| s.parse::<SocketAddr>().unwrap())
-> tee();

// phase_map tells us what phase each transaction is in
// There are only 3 phases per xid:
// 1. coordinator sends PREPARE, subordinates vote COMMIT/ABORT
// 2. coordinator send final decision, subordinates ACK
// 3. coordinate sends END, subordinates respond with ENDED
// After phase 3 we delete the xid from the phase_map
phase_map = union() -> persist_mut_keyed();

// set up channels
outbound_chan = tee();
outbound_chan[0] -> dest_sink_serde(outbound);
Expand All @@ -31,49 +39,90 @@ pub(crate) async fn run_coordinator(outbound: UdpSink, inbound: UdpStream, opts:
_ => errs.give(m),
});
msgs[errs] -> for_each(|m| println!("Received unexpected message type: {:?}", m));
msgs[endeds] -> null();
msgs[endeds]
-> map(|m: SubordResponse| hydroflow::util::PersistenceKeyed::Delete(m.xid))
-> defer_tick()
-> phase_map;

// we log all messages (in this prototype we just print)
inbound_chan[1] -> for_each(|m| println!("Received {:?}", m));
outbound_chan[1] -> for_each(|(m, a)| println!("Sending {:?} to {:?}", m, a));

// setup broadcast channel to all subords
broadcast_join = cross_join::<'static>() -> outbound_chan;
broadcast_join = cross_join::<'tick, 'static>() -> outbound_chan;
broadcast = union() -> [0]broadcast_join;
subords[1] -> [1]broadcast_join;
subords[2] -> for_each(|s| println!("Subordinate: {:?}", s));


// Phase 1 initiate:
// Given a transaction commit request from stdio, broadcast a Prepare to subordinates
source_stdin()
initiate = source_stdin()
-> filter_map(|l: Result<std::string::String, std::io::Error>| parse_out(l.unwrap()))
-> map(|xid| CoordMsg{xid, mtype: MsgType::Prepare})
-> tee();
initiate
-> flat_map(|xid: u16| [hydroflow::util::PersistenceKeyed::Delete(xid), hydroflow::util::PersistenceKeyed::Persist(xid, 1)])
-> phase_map;
initiate
-> map(|xid:u16| CoordMsg{xid, mtype: MsgType::Prepare})
-> [0]broadcast;

// Phase 1 responses:
// as soon as we get an abort message for P1, we start Phase 2 with Abort.
// We'll respond to each abort message: this is redundant but correct (and monotone)
msgs[aborts]
abort_p1s = msgs[aborts] -> tee();
abort_p1s
-> flat_map(|m: SubordResponse| [hydroflow::util::PersistenceKeyed::Delete(m.xid), hydroflow::util::PersistenceKeyed::Persist(m.xid, 2)])
-> defer_tick()
-> phase_map;
abort_p1s
-> map(|m: SubordResponse| CoordMsg{xid: m.xid, mtype: MsgType::Abort})
-> [1]broadcast;

// count commit votes
// XXX This fold_keyed accumulates xids without bound.
// Should be replaced with a persist_mut_keyed and logic to manage it.
commit_votes = msgs[commits]
-> map(|m: SubordResponse| (m.xid, 1))
-> fold_keyed::<'static, u16, u32>(|| 0, |acc: &mut _, val| *acc += val);

// count subordinates
subord_total = subords[0] -> fold::<'tick>(|| 0, |a: &mut _, _b| *a += 1); // -> for_each(|n| println!("There are {} subordinates.", n));
subord_total = subords[0] -> fold::<'static>(|| 0, |a: &mut _, _b| *a += 1);

// If commit_votes for this xid is the same as all_votes, send a P2 Commit message
committed = join() -> map(|(_c, (xid, ()))| xid);
commit_votes -> map(|(xid, c)| (c, xid)) -> [0]committed;
subord_total -> map(|c| (c, ())) -> [1]committed;
committed -> map(|xid| CoordMsg{xid, mtype: MsgType::Commit}) -> [2]broadcast;
committed = join::<'tick,'tick>() -> map(|(_c, (xid, ()))| xid);

// the committed join would succeed forever once a transaction is chosen for commit
// so we filter to only send the P2 commit message and output to screen if this xid is still in Phase 1
// We also transition this xid to Phase 2 to start the next tick
committed -> map(|xid| (xid, ())) -> [0]check_committed;
phase_map -> [1]check_committed;
check_committed = join::<'tick, 'tick>()
-> map(|(xid, (_, phase))| (xid, phase))
-> filter(|(_xid, phase)| *phase == 1)
-> map(|(xid, _phase)| xid)
-> tee();
// update the phase_map
check_committed
-> flat_map(|xid| [hydroflow::util::PersistenceKeyed::Delete(xid), hydroflow::util::PersistenceKeyed::Persist(xid, 2)])
-> defer_tick()
-> phase_map;
// broadcast the P2 commit message
check_committed
-> map(|xid| CoordMsg{xid, mtype: MsgType::Commit})
-> [2]broadcast;

// Handle p2 acknowledgments by sending an End message
msgs[acks] -> map(|m:SubordResponse| CoordMsg{xid: m.xid, mtype: MsgType::End,})
-> [3]broadcast;
ack_p2s = msgs[acks] -> tee();
ack_p2s
-> flat_map(|m: SubordResponse| [hydroflow::util::PersistenceKeyed::Delete(m.xid), hydroflow::util::PersistenceKeyed::Persist(m.xid, 3)])
-> defer_tick()
-> phase_map;
ack_p2s
-> map(|m:SubordResponse| CoordMsg{xid: m.xid, mtype: MsgType::End,})
-> [3]broadcast;

// Handler for ended acknowledgments not necessary; we just print them
};
Expand Down
4 changes: 2 additions & 2 deletions hydroflow/examples/two_pc/subordinate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(crate) async fn run_subordinate(outbound: UdpSink, inbound: UdpStream, opts:
-> map(|json: Addresses| json.coordinator)
-> map(|s| s.parse::<SocketAddr>().unwrap())
-> inspect(|coordinator| println!("Coordinator: {}", coordinator));
server_addr_join = cross_join::<'static>();
server_addr_join = cross_join::<'tick, 'static>();
server_addr -> [1]server_addr_join;

// set up channels
Expand All @@ -43,7 +43,7 @@ pub(crate) async fn run_subordinate(outbound: UdpSink, inbound: UdpStream, opts:
// in this prototype we choose randomly whether to abort via decide()
report_chan = msgs[prepares] -> map(|m: CoordMsg| SubordResponse {
xid: m.xid,
mtype: if decide(67) { MsgType::Commit } else { MsgType::Abort }
mtype: if decide(80) { MsgType::Commit } else { MsgType::Abort }
});
report_chan -> [0]outbound_chan;

Expand Down
2 changes: 1 addition & 1 deletion lattices/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Take a look at the [`lattice` rustdocs](https://hydro-project.github.io/hydroflo
## Lattices

`lattices` provides implementations of common lattice types:
* [`Min<T>`] and [`Max<T>`] - totally-orderd lattices.
* [`Min<T>`] and [`Max<T>`] - totally-ordered lattices.
* [`set_union::SetUnion<T>`] - set-union lattice of scalar values.
* [`map_union::MapUnion<K, Lat>`] - scalar keys with nested lattice values.
* [`union_find::UnionFind<K>`] - union partitions of a set of scalar values.
Expand Down

0 comments on commit 0539e2a

Please sign in to comment.