Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ pub mod types;
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;

pub use types::{Bundle, BundleHash, BundleWithMetadata, CancelBundle};
pub use types::{
BLOCK_TIME, Bundle, BundleHash, BundleWithMetadata, CancelBundle, MeterBundleResponse,
};
5 changes: 3 additions & 2 deletions crates/core/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{Bundle, BundleWithMetadata};
use crate::{Bundle, BundleWithMetadata, MeterBundleResponse};
use alloy_consensus::SignableTransaction;
use alloy_primitives::{Address, U256};
use alloy_provider::network::TxSignerSync;
Expand Down Expand Up @@ -38,6 +38,7 @@ pub fn create_test_bundle(
max_timestamp,
..Default::default()
};
let meter_bundle_response = MeterBundleResponse::default();

BundleWithMetadata::load(bundle).unwrap()
BundleWithMetadata::load(bundle, meter_bundle_response).unwrap()
}
58 changes: 45 additions & 13 deletions crates/core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use op_alloy_flz::tx_estimated_size_fjord_bytes;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

/// Block time in microseconds
pub const BLOCK_TIME: u128 = 2_000_000;

#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Bundle {
Expand Down Expand Up @@ -70,10 +73,14 @@ pub struct BundleWithMetadata {
bundle: Bundle,
uuid: Uuid,
transactions: Vec<OpTxEnvelope>,
meter_bundle_response: MeterBundleResponse,
}
Comment on lines 72 to 77
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if a Bundle fails validate_bundle then there wouldn't be a time to meter bundle, which is why I wrapped MeterBundleResponse around an Option

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what BundleWithMetadata is supposed to be. It seems to have two conflicting purposes: 1) transform the bundle data into a more useful format, and 2) provide all the information the bundler needs to receive over Kafka. For (1), it's providing the transaction data twice: once as bytes and once as parsed data structures.

But when I dig into the Kafka publishing code, it looks like BundleWithMetadata is never sent over the wire? So is adding it here actually useful? I think we need a type for the actual Kafka event, which would just have a uuid, a bundle, and a meter_bundle_response. In that type, meter_bundle_response wouldn't be optional.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good now, but for context, I'm going to have to stop using BundleWithMetadata in node-reth's metering crate since I won't have the meter_bundle_response yet. If there ends up being another type with the utility functions on it (I'm using bundle_with_metadata.bundle_hash()) then I'll use it, otherwise I'll reintroduce a standalone function for it local to that crate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using bundle_with_metadata.bundle_hash()

if that's the case, perhaps in node-reth it can remain as Bundle ?

since as we discussed offline that i believe the intention with BundleWithMetadata is the struct to be sent to kafka with things like the metering results

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the most straightforward thing to do is to never convert it from Bundle at all. But it'd be nice to have a way to pull in utility functions from tips-core. Right now they're all BundleWithMetadata methods, so we'd either want to move them to a new type (in that hypothetical third PR we talked about), change them to standalone utility functions instead of methods, or just reimplement them where BundleWithMetadata doesn't fit.


impl BundleWithMetadata {
pub fn load(mut bundle: Bundle) -> Result<Self, String> {
pub fn load(
mut bundle: Bundle,
meter_bundle_response: MeterBundleResponse,
) -> Result<Self, String> {
let uuid = bundle
.replacement_uuid
.clone()
Expand All @@ -96,6 +103,7 @@ impl BundleWithMetadata {
bundle,
transactions,
uuid,
meter_bundle_response,
})
}

Expand Down Expand Up @@ -178,6 +186,24 @@ pub struct MeterBundleResponse {
pub state_root_time_us: u128,
}

impl Default for MeterBundleResponse {
fn default() -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems only appropriate for use in tests, so I'm not sure if the Default trait approach is the right way to go. I think it'll lead people to populate their bundles with default MeterBundleResponses, which the builder will need to log errors for since they won't be able to be metered. Feel free to leave it as is, but I think I'd change this to a function in the tests (which is admittedly a bit harder for tests defined in an inner module).

Self {
bundle_gas_price: "0".to_string(),
bundle_hash: B256::default(),
coinbase_diff: "0".to_string(),
eth_sent_to_coinbase: "0".to_string(),
gas_fees: "0".to_string(),
results: vec![],
state_block_number: 0,
state_flashblock_index: None,
total_gas_used: 0,
total_execution_time_us: 0,
state_root_time_us: 0,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -197,12 +223,15 @@ mod tests {
let tx1_bytes = tx1.encoded_2718();
let tx2_bytes = tx2.encoded_2718();

let bundle = BundleWithMetadata::load(Bundle {
replacement_uuid: None,
txs: vec![tx1_bytes.clone().into()],
block_number: 1,
..Default::default()
})
let bundle = BundleWithMetadata::load(
Bundle {
replacement_uuid: None,
txs: vec![tx1_bytes.clone().into()],
block_number: 1,
..Default::default()
},
MeterBundleResponse::default(),
)
.unwrap();

assert!(!bundle.uuid().is_nil());
Expand All @@ -225,12 +254,15 @@ mod tests {
assert_eq!(bundle.bundle_hash(), expected_bundle_hash_single);

let uuid = Uuid::new_v4();
let bundle = BundleWithMetadata::load(Bundle {
replacement_uuid: Some(uuid.to_string()),
txs: vec![tx1_bytes.clone().into(), tx2_bytes.clone().into()],
block_number: 1,
..Default::default()
})
let bundle = BundleWithMetadata::load(
Bundle {
replacement_uuid: Some(uuid.to_string()),
txs: vec![tx1_bytes.clone().into(), tx2_bytes.clone().into()],
block_number: 1,
..Default::default()
},
MeterBundleResponse::default(),
)
.unwrap();

assert_eq!(*bundle.uuid(), uuid);
Expand Down
5 changes: 3 additions & 2 deletions crates/ingress-rpc/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl QueuePublisher for KafkaQueuePublisher {
mod tests {
use super::*;
use rdkafka::config::ClientConfig;
use tips_core::{Bundle, BundleWithMetadata};
use tips_core::{Bundle, BundleWithMetadata, MeterBundleResponse};
use tokio::time::{Duration, Instant};

fn create_test_bundle() -> Bundle {
Expand All @@ -93,7 +93,8 @@ mod tests {

let publisher = KafkaQueuePublisher::new(producer, "tips-ingress-rpc".to_string());
let bundle = create_test_bundle();
let bundle_with_metadata = BundleWithMetadata::load(bundle.clone()).unwrap();
let bundle_with_metadata =
BundleWithMetadata::load(bundle.clone(), MeterBundleResponse::default()).unwrap();
let bundle_hash = bundle_with_metadata.bundle_hash();

let start = Instant::now();
Expand Down
45 changes: 35 additions & 10 deletions crates/ingress-rpc/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use op_alloy_network::Optimism;
use reth_rpc_eth_types::EthApiError;
use std::time::{SystemTime, UNIX_EPOCH};
use tips_audit::{BundleEvent, BundleEventPublisher};
use tips_core::{Bundle, BundleHash, BundleWithMetadata, CancelBundle};
use tips_core::{
BLOCK_TIME, Bundle, BundleHash, BundleWithMetadata, CancelBundle, MeterBundleResponse,
};
use tracing::{info, warn};

use crate::queue::QueuePublisher;
Expand Down Expand Up @@ -65,7 +67,10 @@ where
Audit: BundleEventPublisher + Sync + Send + 'static,
{
async fn send_bundle(&self, bundle: Bundle) -> RpcResult<BundleHash> {
let bundle_with_metadata = self.validate_bundle(bundle).await?;
self.validate_bundle(&bundle).await?;
let meter_bundle_response = self.meter_bundle(&bundle).await?;
let bundle_with_metadata = BundleWithMetadata::load(bundle, meter_bundle_response)
.map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?;

let bundle_hash = bundle_with_metadata.bundle_hash();
if let Err(e) = self
Expand Down Expand Up @@ -117,8 +122,9 @@ where
reverting_tx_hashes: vec![transaction.tx_hash()],
..Default::default()
};
let meter_bundle_response = self.meter_bundle(&bundle).await?;

let bundle_with_metadata = BundleWithMetadata::load(bundle)
let bundle_with_metadata = BundleWithMetadata::load(bundle, meter_bundle_response)
.map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?;
let bundle_hash = bundle_with_metadata.bundle_hash();

Expand Down Expand Up @@ -191,25 +197,44 @@ where
Ok(transaction)
}

async fn validate_bundle(&self, bundle: Bundle) -> RpcResult<BundleWithMetadata> {
async fn validate_bundle(&self, bundle: &Bundle) -> RpcResult<()> {
if bundle.txs.is_empty() {
return Err(
EthApiError::InvalidParams("Bundle cannot have empty transactions".into())
.into_rpc_err(),
);
}

let bundle_with_metadata = BundleWithMetadata::load(bundle.clone())
.map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?;
let tx_hashes = bundle_with_metadata.txn_hashes();

let mut total_gas = 0u64;
let mut tx_hashes = Vec::new();
for tx_data in &bundle.txs {
let transaction = self.validate_tx(tx_data).await?;
total_gas = total_gas.saturating_add(transaction.gas_limit());
tx_hashes.push(transaction.tx_hash());
}
validate_bundle(&bundle, total_gas, tx_hashes)?;
validate_bundle(bundle, total_gas, tx_hashes)?;

Ok(bundle_with_metadata)
Ok(())
}

/// `meter_bundle` is used to determine how long a bundle will take to execute. A bundle that
/// is within `BLOCK_TIME` will return the `MeterBundleResponse` that can be passed along
/// to the builder.
async fn meter_bundle(&self, bundle: &Bundle) -> RpcResult<MeterBundleResponse> {
let res: MeterBundleResponse = self
.provider
.client()
.request("base_meterBundle", (bundle,))
.await
.map_err(|e| EthApiError::InvalidParams(e.to_string()).into_rpc_err())?;

// we can save some builder payload building computation by not including bundles
// that we know will take longer than the block time to execute
if res.total_execution_time_us > BLOCK_TIME {
return Err(
EthApiError::InvalidParams("Bundle simulation took too long".into()).into_rpc_err(),
);
}
Ok(res)
}
}