Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hydroflow_plus): add utility to dedup tees when debugging IR #1491

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 67 additions & 11 deletions hydroflow_plus/src/ir.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::panic;
use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::ops::Deref;
use std::rc::Rc;

Expand Down Expand Up @@ -36,7 +37,7 @@ impl ToTokens for DebugExpr {
}
}

impl std::fmt::Debug for DebugExpr {
impl Debug for DebugExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.to_token_stream())
}
Expand All @@ -47,7 +48,7 @@ pub enum DebugInstantiate {
Finalized(syn::Expr, syn::Expr, Option<Box<dyn FnOnce()>>),
}

impl std::fmt::Debug for DebugInstantiate {
impl Debug for DebugInstantiate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "<network instantiate>")
}
Expand All @@ -56,7 +57,7 @@ impl std::fmt::Debug for DebugInstantiate {
#[derive(Clone)]
pub struct DebugPipelineFn(pub Rc<dyn Fn() -> Pipeline + 'static>);

impl std::fmt::Debug for DebugPipelineFn {
impl Debug for DebugPipelineFn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "<function>")
}
Expand Down Expand Up @@ -208,6 +209,59 @@ impl HfPlusLeaf {
}
}

type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HfPlusNode>, usize>)>>;
thread_local! {
static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
}

pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
PRINTED_TEES.with(|printed_tees| {
let mut printed_tees_mut = printed_tees.borrow_mut();
*printed_tees_mut = Some((0, HashMap::new()));
drop(printed_tees_mut);

let ret = f();

let mut printed_tees_mut = printed_tees.borrow_mut();
*printed_tees_mut = None;

ret
})
}

pub struct TeeNode(pub Rc<RefCell<HfPlusNode>>);

impl Debug for TeeNode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
PRINTED_TEES.with(|printed_tees| {
let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
let printed_tees_mut = printed_tees_mut_borrow.as_mut();

if let Some(printed_tees_mut) = printed_tees_mut {
if let Some(existing) = printed_tees_mut
.1
.get(&(self.0.as_ref() as *const RefCell<HfPlusNode>))
{
write!(f, "<tee {}>", existing)
} else {
let next_id = printed_tees_mut.0;
printed_tees_mut.0 += 1;
printed_tees_mut
.1
.insert(self.0.as_ref() as *const RefCell<HfPlusNode>, next_id);
drop(printed_tees_mut_borrow);
write!(f, "<tee {}>: ", next_id)?;
Debug::fmt(&self.0.borrow(), f)
}
} else {
drop(printed_tees_mut_borrow);
write!(f, "<tee>: ")?;
Debug::fmt(&self.0.borrow(), f)
}
})
}
}

/// An intermediate node in a Hydroflow+ graph, which consumes data
/// from upstream nodes and emits data to downstream nodes.
#[derive(Debug)]
Expand All @@ -225,7 +279,7 @@ pub enum HfPlusNode {
},

Tee {
inner: Rc<RefCell<HfPlusNode>>,
inner: TeeNode,
},

Persist(Box<HfPlusNode>),
Expand Down Expand Up @@ -383,19 +437,19 @@ impl<'a> HfPlusNode {

HfPlusNode::Tee { inner } => {
if let Some(transformed) =
seen_tees.get(&(inner.as_ref() as *const RefCell<HfPlusNode>))
seen_tees.get(&(inner.0.as_ref() as *const RefCell<HfPlusNode>))
{
*inner = transformed.clone();
*inner = TeeNode(transformed.clone());
} else {
let transformed_cell = Rc::new(RefCell::new(HfPlusNode::Placeholder));
seen_tees.insert(
inner.as_ref() as *const RefCell<HfPlusNode>,
inner.0.as_ref() as *const RefCell<HfPlusNode>,
transformed_cell.clone(),
);
let mut orig = inner.replace(HfPlusNode::Placeholder);
let mut orig = inner.0.replace(HfPlusNode::Placeholder);
transform(&mut orig, seen_tees);
*transformed_cell.borrow_mut() = orig;
*inner = transformed_cell;
*inner = TeeNode(transformed_cell);
}
}

Expand Down Expand Up @@ -598,11 +652,13 @@ impl<'a> HfPlusNode {
}

HfPlusNode::Tee { inner } => {
if let Some(ret) = built_tees.get(&(inner.as_ref() as *const RefCell<HfPlusNode>)) {
if let Some(ret) = built_tees.get(&(inner.0.as_ref() as *const RefCell<HfPlusNode>))
{
ret.clone()
} else {
let (inner_ident, inner_location_id) =
inner
.0
.borrow()
.emit(graph_builders, built_tees, next_stmt_id);

Expand All @@ -618,7 +674,7 @@ impl<'a> HfPlusNode {
});

built_tees.insert(
inner.as_ref() as *const RefCell<HfPlusNode>,
inner.0.as_ref() as *const RefCell<HfPlusNode>,
(tee_ident.clone(), inner_location_id),
);

Expand Down
14 changes: 7 additions & 7 deletions hydroflow_plus/src/persist_pullup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ fn persist_pullup_node(
HfPlusNode::Delta(box HfPlusNode::Persist(box behind_persist)) => behind_persist,

HfPlusNode::Tee { inner } => {
if persist_pulled_tees.contains(&(inner.as_ref() as *const RefCell<HfPlusNode>)) {
if persist_pulled_tees.contains(&(inner.0.as_ref() as *const RefCell<HfPlusNode>)) {
HfPlusNode::Persist(Box::new(HfPlusNode::Tee {
inner: inner.clone(),
inner: TeeNode(inner.0.clone()),
}))
} else if matches!(*inner.borrow(), HfPlusNode::Persist(_)) {
persist_pulled_tees.insert(inner.as_ref() as *const RefCell<HfPlusNode>);
} else if matches!(*inner.0.borrow(), HfPlusNode::Persist(_)) {
persist_pulled_tees.insert(inner.0.as_ref() as *const RefCell<HfPlusNode>);
if let HfPlusNode::Persist(box behind_persist) =
inner.replace(HfPlusNode::Placeholder)
inner.0.replace(HfPlusNode::Placeholder)
{
*inner.borrow_mut() = behind_persist;
*inner.0.borrow_mut() = behind_persist;
} else {
unreachable!()
}

HfPlusNode::Persist(Box::new(HfPlusNode::Tee {
inner: inner.clone(),
inner: TeeNode(inner.0.clone()),
}))
} else {
HfPlusNode::Tee { inner }
Expand Down
10 changes: 5 additions & 5 deletions hydroflow_plus/src/singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use stageleft::{q, IntoQuotedMut, Quoted};

use crate::builder::FlowState;
use crate::cycle::{CycleCollection, CycleCollectionWithInitial, CycleComplete};
use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource};
use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource, TeeNode};
use crate::location::{Location, LocationId};
use crate::stream::{Bounded, NoTick, Tick, Unbounded};
use crate::Stream;
Expand Down Expand Up @@ -168,7 +168,7 @@ impl<'a, T: Clone, W, C, N: Location<'a>> Clone for Singleton<T, W, C, N> {
if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) {
let orig_ir_node = self.ir_node.replace(HfPlusNode::Placeholder);
*self.ir_node.borrow_mut() = HfPlusNode::Tee {
inner: Rc::new(RefCell::new(orig_ir_node)),
inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
};
}

Expand All @@ -177,7 +177,7 @@ impl<'a, T: Clone, W, C, N: Location<'a>> Clone for Singleton<T, W, C, N> {
location_kind: self.location_kind,
flow_state: self.flow_state.clone(),
ir_node: HfPlusNode::Tee {
inner: inner.clone(),
inner: TeeNode(inner.0.clone()),
}
.into(),
_phantom: PhantomData,
Expand Down Expand Up @@ -465,7 +465,7 @@ impl<'a, T: Clone, W, C, N: Location<'a>> Clone for Optional<T, W, C, N> {
if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) {
let orig_ir_node = self.ir_node.replace(HfPlusNode::Placeholder);
*self.ir_node.borrow_mut() = HfPlusNode::Tee {
inner: Rc::new(RefCell::new(orig_ir_node)),
inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
};
}

Expand All @@ -474,7 +474,7 @@ impl<'a, T: Clone, W, C, N: Location<'a>> Clone for Optional<T, W, C, N> {
location_kind: self.location_kind,
flow_state: self.flow_state.clone(),
ir_node: HfPlusNode::Tee {
inner: inner.clone(),
inner: TeeNode(inner.0.clone()),
}
.into(),
_phantom: PhantomData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ expression: optimized.ir()
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , i32 > ({ use crate :: __staged :: persist_pullup :: tests :: * ; | v | v + 1 }),
input: Tee {
inner: RefCell {
value: Source {
source: Iter(
{ use crate :: __staged :: persist_pullup :: tests :: * ; 0 .. 10 },
),
location_kind: Process(
0,
),
},
inner: <tee>: Source {
source: Iter(
{ use crate :: __staged :: persist_pullup :: tests :: * ; 0 .. 10 },
),
location_kind: Process(
0,
),
},
},
},
Expand All @@ -29,15 +27,13 @@ expression: optimized.ir()
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , i32 > ({ use crate :: __staged :: persist_pullup :: tests :: * ; | v | v + 1 }),
input: Tee {
inner: RefCell {
value: Source {
source: Iter(
{ use crate :: __staged :: persist_pullup :: tests :: * ; 0 .. 10 },
),
location_kind: Process(
0,
),
},
inner: <tee>: Source {
source: Iter(
{ use crate :: __staged :: persist_pullup :: tests :: * ; 0 .. 10 },
),
location_kind: Process(
0,
),
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,20 @@ expression: built.ir()
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , i32 > ({ use crate :: __staged :: persist_pullup :: tests :: * ; | v | v + 1 }),
input: Tee {
inner: RefCell {
value: Persist(
Unpersist(
Persist(
Source {
source: Iter(
{ use crate :: __staged :: persist_pullup :: tests :: * ; 0 .. 10 },
),
location_kind: Process(
0,
),
},
),
inner: <tee>: Persist(
Unpersist(
Persist(
Source {
source: Iter(
{ use crate :: __staged :: persist_pullup :: tests :: * ; 0 .. 10 },
),
location_kind: Process(
0,
),
},
),
),
},
),
},
},
),
Expand All @@ -38,22 +36,20 @@ expression: built.ir()
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , i32 > ({ use crate :: __staged :: persist_pullup :: tests :: * ; | v | v + 1 }),
input: Tee {
inner: RefCell {
value: Persist(
Unpersist(
Persist(
Source {
source: Iter(
{ use crate :: __staged :: persist_pullup :: tests :: * ; 0 .. 10 },
),
location_kind: Process(
0,
),
},
),
inner: <tee>: Persist(
Unpersist(
Persist(
Source {
source: Iter(
{ use crate :: __staged :: persist_pullup :: tests :: * ; 0 .. 10 },
),
location_kind: Process(
0,
),
},
),
),
},
),
},
},
),
Expand Down
6 changes: 3 additions & 3 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use syn::parse_quote;
use super::staging_util::get_this_crate;
use crate::builder::{self, FlowState};
use crate::cycle::{CycleCollection, CycleComplete};
use crate::ir::{DebugInstantiate, HfPlusLeaf, HfPlusNode, HfPlusSource};
use crate::ir::{DebugInstantiate, HfPlusLeaf, HfPlusNode, HfPlusSource, TeeNode};
use crate::location::{
CanSend, ExternalBincodeStream, ExternalBytesPort, ExternalProcess, Location, LocationId,
};
Expand Down Expand Up @@ -125,7 +125,7 @@ impl<'a, T: Clone, W, C, N: Location<'a>> Clone for Stream<T, W, C, N> {
if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) {
let orig_ir_node = self.ir_node.replace(HfPlusNode::Placeholder);
*self.ir_node.borrow_mut() = HfPlusNode::Tee {
inner: Rc::new(RefCell::new(orig_ir_node)),
inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
};
}

Expand All @@ -134,7 +134,7 @@ impl<'a, T: Clone, W, C, N: Location<'a>> Clone for Stream<T, W, C, N> {
location_kind: self.location_kind,
flow_state: self.flow_state.clone(),
ir_node: HfPlusNode::Tee {
inner: inner.clone(),
inner: TeeNode(inner.0.clone()),
}
.into(),
_phantom: PhantomData,
Expand Down
4 changes: 3 additions & 1 deletion hydroflow_plus_test/src/cluster/paxos_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,9 @@ mod tests {
let _ = super::paxos_bench(&builder, 1, 1, 1, 1, 1, 1, 1);
let built = builder.with_default_optimize();

insta::assert_debug_snapshot!(built.ir());
hydroflow_plus::ir::dbg_dedup_tee(|| {
insta::assert_debug_snapshot!(built.ir());
});

let _ = built.compile::<DeployRuntime>(&RuntimeData::new("FAKE"));
}
Expand Down
Loading
Loading