Skip to content

Commit

Permalink
fix(hydroflow_lang): multiset_delta incorrect `is_first_run_this_ti…
Browse files Browse the repository at this point in the history
…ck` check, fixes #958 (#959)

Introduced in #906

Also adds more `multiset_delta` tests.

Refs: #958, #906
  • Loading branch information
MingweiSamuel authored Nov 7, 2023
1 parent f327b02 commit 43280cb
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 6 deletions.
136 changes: 134 additions & 2 deletions hydroflow/tests/surface_multiset_delta.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use hydroflow::assert_graphvis_snapshots;
use hydroflow::util::collect_ready;
use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax};
use multiplatform_test::multiplatform_test;

#[multiplatform_test]
pub fn test_multiset_delta() {
let (input_send, input_recv) = hydroflow::util::unbounded_channel::<u32>();
let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<u32>();

let mut flow = hydroflow::hydroflow_syntax! {
let mut flow = hydroflow_syntax! {
source_stream(input_recv)
-> multiset_delta()
-> for_each(|x| result_send.send(x).unwrap());
Expand Down Expand Up @@ -50,3 +50,135 @@ pub fn test_persist_multiset_delta() {
flow.run_tick();
assert!(collect_ready::<Vec<_>, _>(&mut output_recv).is_empty());
}

#[multiplatform_test]
pub fn test_multiset_delta_2() {
let (input_send, input_recv) = hydroflow::util::unbounded_channel::<u32>();
let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<u32>();

let mut flow = hydroflow_syntax! {
source_stream(input_recv)
-> multiset_delta()
-> for_each(|x| result_send.send(x).unwrap());
};

input_send.send(3).unwrap();
input_send.send(4).unwrap();
input_send.send(3).unwrap();
flow.run_tick();
assert_eq!(&[3, 4, 3], &*collect_ready::<Vec<_>, _>(&mut result_recv));

input_send.send(3).unwrap();
input_send.send(4).unwrap();
input_send.send(3).unwrap();
input_send.send(3).unwrap();
flow.run_tick();

assert_eq!(&[3], &*collect_ready::<Vec<_>, _>(&mut result_recv));
}

#[multiplatform_test]
fn test_chat_app_replay() {
let (users_send, users) = hydroflow::util::unbounded_channel::<u32>();
let (messages_send, messages) = hydroflow::util::unbounded_channel::<String>();
let (out, mut out_recv) = hydroflow::util::unbounded_channel::<(u32, String)>();

let mut chat_server = hydroflow_syntax! {
users = source_stream(users) -> persist();
messages = source_stream(messages) -> persist();
users -> [0]crossed;
messages -> [1]crossed;
crossed = cross_join::<'tick, 'tick>()
-> multiset_delta()
-> for_each(|t| {
out.send(t).unwrap();
});
};

users_send.send(1).unwrap();
users_send.send(2).unwrap();

messages_send.send("hello".to_string()).unwrap();
messages_send.send("world".to_string()).unwrap();

chat_server.run_tick();

assert_eq!(
&[
(1, "hello".to_string()),
(2, "hello".to_string()),
(1, "world".to_string()),
(2, "world".to_string())
],
&*collect_ready::<Vec<_>, _>(&mut out_recv),
);

users_send.send(3).unwrap();

messages_send.send("goodbye".to_string()).unwrap();

chat_server.run_tick();

// fails with: [(1, "hello"), (2, "hello"), (3, "hello"), (1, "world"), (2, "world"), (3, "world"), (1, "goodbye"), (2, "goodbye"), (3, "goodbye")]

assert_eq!(
&[
(3, "hello".to_string()),
(3, "world".to_string()),
(1, "goodbye".to_string()),
(2, "goodbye".to_string()),
(3, "goodbye".to_string())
],
&*collect_ready::<Vec<_>, _>(&mut out_recv),
);
}

#[multiplatform_test]
fn test_chat_app_replay_manual() {
let (input_send, input_recv) = hydroflow::util::unbounded_channel::<(u32, String)>();
let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<(u32, String)>();

let mut flow = hydroflow_syntax! {
source_stream(input_recv)
-> multiset_delta()
-> for_each(|x| result_send.send(x).unwrap());
};

input_send.send((1, "hello".to_owned())).unwrap();
input_send.send((2, "hello".to_owned())).unwrap();
input_send.send((1, "world".to_owned())).unwrap();
input_send.send((2, "world".to_owned())).unwrap();

flow.run_tick();
assert_eq!(
&[
(1, "hello".to_string()),
(2, "hello".to_string()),
(1, "world".to_string()),
(2, "world".to_string())
],
&*collect_ready::<Vec<_>, _>(&mut result_recv),
);

input_send.send((1, "hello".to_owned())).unwrap();
input_send.send((2, "hello".to_owned())).unwrap();
input_send.send((3, "hello".to_owned())).unwrap();
input_send.send((1, "world".to_owned())).unwrap();
input_send.send((2, "world".to_owned())).unwrap();
input_send.send((3, "world".to_owned())).unwrap();
input_send.send((1, "goodbye".to_owned())).unwrap();
input_send.send((2, "goodbye".to_owned())).unwrap();
input_send.send((3, "goodbye".to_owned())).unwrap();

flow.run_tick();
assert_eq!(
&[
(3, "hello".to_string()),
(3, "world".to_string()),
(1, "goodbye".to_string()),
(2, "goodbye".to_string()),
(3, "goodbye".to_string())
],
&*collect_ready::<Vec<_>, _>(&mut result_recv),
);
}
17 changes: 13 additions & 4 deletions hydroflow_lang/src/graph/ops/multiset_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,21 @@ pub const MULTISET_DELTA: OperatorConstraints = OperatorConstraints {
let #curr_data = #hydroflow.add_state(::std::cell::RefCell::new(#root::rustc_hash::FxHashMap::default()));
};

let filter_fn = quote_spanned! {op_span=>
|item| {
let mut prev_map = #context.state_ref(#prev_data).borrow_mut();
let mut curr_map = #context.state_ref(#curr_data).borrow_mut();
let tick_swap = quote_spanned! {op_span=>
{
if context.is_first_run_this_tick() {
let mut prev_map = #context.state_ref(#prev_data).borrow_mut();
let mut curr_map = #context.state_ref(#curr_data).borrow_mut();
::std::mem::swap(::std::ops::DerefMut::deref_mut(&mut prev_map), ::std::ops::DerefMut::deref_mut(&mut curr_map));
curr_map.clear();
}
}
};

let filter_fn = quote_spanned! {op_span=>
|item| {
let mut prev_map = #context.state_ref(#prev_data).borrow_mut();
let mut curr_map = #context.state_ref(#curr_data).borrow_mut();

*curr_map.entry(#[allow(clippy::clone_on_copy)] item.clone()).or_insert(0_usize) += 1;
if let Some(old_count) = prev_map.get_mut(item) {
Expand All @@ -93,10 +100,12 @@ pub const MULTISET_DELTA: OperatorConstraints = OperatorConstraints {
};
let write_iterator = if is_pull {
quote_spanned! {op_span=>
#tick_swap
let #ident = #input.filter(#filter_fn);
}
} else {
quote_spanned! {op_span=>
#tick_swap
let #ident = #root::pusherator::filter::Filter::new(#filter_fn, #output);
}
};
Expand Down

0 comments on commit 43280cb

Please sign in to comment.