Skip to content

Commit

Permalink
skip try_recv_engine_message if enable PIPE_EXEC_LAYER_EXT_V2
Browse files Browse the repository at this point in the history
  • Loading branch information
nekomoto911 committed Jan 5, 2025
1 parent d9ceb7a commit 8553373
Showing 1 changed file with 50 additions and 46 deletions.
96 changes: 50 additions & 46 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
pub use reth_engine_primitives::InvalidBlockHook;
use reth_pipe_exec_layer_ext::PIPE_EXEC_LAYER_EXT;
use reth_pipe_exec_layer_ext_v2::{
PipeExecLayerEvent, PIPE_EXEC_LAYER_EXT as PIPE_EXEC_LAYER_EXT_V2,
PipeExecLayerEvent, PipeExecLayerExt as PipeExecLayerExtV2,
PIPE_EXEC_LAYER_EXT as PIPE_EXEC_LAYER_EXT_V2,
};

/// Keeps track of the state of the tree.
Expand Down Expand Up @@ -626,34 +627,33 @@ where
(incoming, outgoing)
}

fn try_recv_pipe_exec_event(&self) -> Result<Option<PipeExecLayerEvent>, RecvError> {
if let Some(ext) = PIPE_EXEC_LAYER_EXT_V2.get() {
if self.persistence_state.in_progress() {
let mut waited_time_ms = 0;
loop {
match ext.event_rx.blocking_lock().try_recv() {
Ok(event) => return Ok(Some(event)),
Err(mpsc::error::TryRecvError::Empty) => {
if waited_time_ms > 500 {
// timeout
return Ok(None);
}
std::thread::sleep(std::time::Duration::from_millis(10));
waited_time_ms += 10;
fn try_recv_pipe_exec_event(
&self,
pipe_exec_layer_ext: &PipeExecLayerExtV2,
) -> Result<Option<PipeExecLayerEvent>, RecvError> {
if self.persistence_state.in_progress() {
let mut waited_time_ms = 0;
loop {
match pipe_exec_layer_ext.event_rx.blocking_lock().try_recv() {
Ok(event) => return Ok(Some(event)),
Err(mpsc::error::TryRecvError::Empty) => {
if waited_time_ms > 500 {
// timeout
return Ok(None);
}
Err(mpsc::error::TryRecvError::Disconnected) => return Err(RecvError),
std::thread::sleep(std::time::Duration::from_millis(10));
waited_time_ms += 10;
}
}
} else {
let event = ext.event_rx.blocking_lock().blocking_recv();
if event.is_some() {
Ok(event)
} else {
Err(RecvError)
Err(mpsc::error::TryRecvError::Disconnected) => return Err(RecvError),
}
}
} else {
Ok(None)
let event = pipe_exec_layer_ext.event_rx.blocking_lock().blocking_recv();
if event.is_some() {
Ok(event)
} else {
Err(RecvError)
}
}
}

Expand Down Expand Up @@ -687,31 +687,35 @@ where
///
/// This will block the current thread and process incoming messages.
pub fn run(mut self) {
// Wait for the pipe exec layer to be initialized
std::thread::sleep(std::time::Duration::from_secs(3));
let pipe_exec_layer_ext = PIPE_EXEC_LAYER_EXT_V2.get();
loop {
match self.try_recv_pipe_exec_event() {
Ok(Some(event)) => self.on_pipe_exec_event(event),
Ok(None) => {}
Err(RecvError) => {
error!(target: "engine::tree", "Pipe exec layer channel disconnected");
return
}
}

match self.try_recv_engine_message() {
Ok(Some(msg)) => {
debug!(target: "engine::tree", %msg, "received new engine message");
if let Err(fatal) = self.on_engine_message(msg) {
error!(target: "engine::tree", %fatal, "insert block fatal error");
match pipe_exec_layer_ext {
Some(ext) => match self.try_recv_pipe_exec_event(ext) {
Ok(Some(event)) => self.on_pipe_exec_event(event),
Ok(None) => {}
Err(RecvError) => {
error!(target: "engine::tree", "Pipe exec layer channel disconnected");
return
}
}
Ok(None) => {
debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
}
Err(_err) => {
error!(target: "engine::tree", "Engine channel disconnected");
return
}
},
None => match self.try_recv_engine_message() {
Ok(Some(msg)) => {
debug!(target: "engine::tree", %msg, "received new engine message");
if let Err(fatal) = self.on_engine_message(msg) {
error!(target: "engine::tree", %fatal, "insert block fatal error");
return
}
}
Ok(None) => {
debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
}
Err(_err) => {
error!(target: "engine::tree", "Engine channel disconnected");
return
}
},
}

if let Err(err) = self.advance_persistence() {
Expand Down

0 comments on commit 8553373

Please sign in to comment.