Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions crates/op-rbuilder/src/builders/flashblocks/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,12 @@ pub(super) struct OpPayloadBuilder<Pool, Client, BuilderTx> {
pub pool: Pool,
/// Node client
pub client: Client,
/// Sender for sending built payloads to [`PayloadHandler`],
/// which broadcasts outgoing payloads via p2p.
pub payload_tx: mpsc::Sender<OpBuiltPayload>,
/// Sender for sending built flashblock payloads to [`PayloadHandler`],
/// which broadcasts outgoing flashblock payloads via p2p.
pub built_fb_payload_tx: mpsc::Sender<OpBuiltPayload>,
/// Sender for sending built full block payloads to [`PayloadHandler`],
/// which updates the engine tree state.
pub built_payload_tx: mpsc::Sender<OpBuiltPayload>,
/// WebSocket publisher for broadcasting flashblocks
/// to all connected subscribers.
pub ws_pub: Arc<WebSocketPublisher>,
Expand All @@ -199,7 +202,8 @@ impl<Pool, Client, BuilderTx> OpPayloadBuilder<Pool, Client, BuilderTx> {
client: Client,
config: BuilderConfig<FlashblocksConfig>,
builder_tx: BuilderTx,
payload_tx: mpsc::Sender<OpBuiltPayload>,
built_fb_payload_tx: mpsc::Sender<OpBuiltPayload>,
built_payload_tx: mpsc::Sender<OpBuiltPayload>,
ws_pub: Arc<WebSocketPublisher>,
metrics: Arc<OpRBuilderMetrics>,
) -> Self {
Expand All @@ -208,7 +212,8 @@ impl<Pool, Client, BuilderTx> OpPayloadBuilder<Pool, Client, BuilderTx> {
evm_config,
pool,
client,
payload_tx,
built_fb_payload_tx,
built_payload_tx,
ws_pub,
config,
metrics,
Expand Down Expand Up @@ -402,10 +407,17 @@ where
!disable_state_root || ctx.attributes().no_tx_pool, // need to calculate state root for CL sync
)?;

self.payload_tx
self.built_fb_payload_tx
.send(payload.clone())
.await
.map_err(PayloadBuilderError::other)?;
if let Err(e) = self.built_payload_tx.send(payload.clone()).await {
warn!(
target: "payload_builder",
error = %e,
"Failed to send updated payload"
);
}
best_payload.set(payload);

info!(
Expand Down Expand Up @@ -848,10 +860,17 @@ where
.ws_pub
.publish(&fb_payload)
.wrap_err("failed to publish flashblock via websocket")?;
self.payload_tx
self.built_fb_payload_tx
.send(new_payload.clone())
.await
.wrap_err("failed to send built payload to handler")?;
if let Err(e) = self.built_payload_tx.send(new_payload.clone()).await {
warn!(
target: "payload_builder",
error = %e,
"Failed to send updated payload"
);
}
best_payload.set(new_payload);

// Record flashblock build duration
Expand Down
24 changes: 16 additions & 8 deletions crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ use tracing::warn;
/// In the case of a payload built by this node, it is broadcast to peers and an event is sent to the payload builder.
/// In the case of a payload received from a peer, it is executed and if successful, an event is sent to the payload builder.
pub(crate) struct PayloadHandler<Client> {
// receives new payloads built by this builder.
built_rx: mpsc::Receiver<OpBuiltPayload>,
// receives new flashblock payloads built by this builder.
built_fb_payload_rx: mpsc::Receiver<OpBuiltPayload>,
// receives new full block payloads built by this builder.
built_payload_rx: mpsc::Receiver<OpBuiltPayload>,
// receives incoming p2p messages from peers.
p2p_rx: mpsc::Receiver<Message>,
// outgoing p2p channel to broadcast new payloads to peers.
Expand All @@ -50,7 +52,8 @@ where
{
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
built_rx: mpsc::Receiver<OpBuiltPayload>,
built_fb_payload_rx: mpsc::Receiver<OpBuiltPayload>,
built_payload_rx: mpsc::Receiver<OpBuiltPayload>,
p2p_rx: mpsc::Receiver<Message>,
p2p_tx: mpsc::Sender<Message>,
payload_events_handle: tokio::sync::broadcast::Sender<Events<OpEngineTypes>>,
Expand All @@ -59,7 +62,8 @@ where
cancel: tokio_util::sync::CancellationToken,
) -> Self {
Self {
built_rx,
built_fb_payload_rx,
built_payload_rx,
p2p_rx,
p2p_tx,
payload_events_handle,
Expand All @@ -71,7 +75,8 @@ where

pub(crate) async fn run(self) {
let Self {
mut built_rx,
mut built_fb_payload_rx,
mut built_payload_rx,
mut p2p_rx,
p2p_tx,
payload_events_handle,
Expand All @@ -84,12 +89,15 @@ where

loop {
tokio::select! {
Some(payload) = built_rx.recv() => {
Some(payload) = built_fb_payload_rx.recv() => {
// ignore error here; if p2p was disabled, the channel will be closed.
let _ = p2p_tx.send(payload.into()).await;
}
Some(payload) = built_payload_rx.recv() => {
// Update engine tree state with locally built block payloads
if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) {
warn!(e = ?e, "failed to send BuiltPayload event");
}
// ignore error here; if p2p was disabled, the channel will be closed.
let _ = p2p_tx.send(payload.into()).await;
}
Some(message) = p2p_rx.recv() => {
match message {
Expand Down
5 changes: 5 additions & 0 deletions crates/op-rbuilder/src/builders/flashblocks/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ impl FlashblocksServiceBuilder {
};

let metrics = Arc::new(OpRBuilderMetrics::default());
// Channels for built flashblock payloads
let (built_fb_payload_tx, built_fb_payload_rx) = tokio::sync::mpsc::channel(16);
// Channels for built full block payloads
let (built_payload_tx, built_payload_rx) = tokio::sync::mpsc::channel(16);

let ws_pub: Arc<WebSocketPublisher> =
Expand All @@ -118,6 +121,7 @@ impl FlashblocksServiceBuilder {
ctx.provider().clone(),
self.0.clone(),
builder_tx,
built_fb_payload_tx,
built_payload_tx,
ws_pub.clone(),
metrics.clone(),
Expand Down Expand Up @@ -145,6 +149,7 @@ impl FlashblocksServiceBuilder {
.wrap_err("failed to create flashblocks payload builder context")?;

let payload_handler = PayloadHandler::new(
built_fb_payload_rx,
built_payload_rx,
incoming_message_rx,
outgoing_message_tx,
Expand Down