Skip to content

Commit 7b4d74d

Browse files
authored
Added cfg time_to_keep_mempool_txs_secs + metrics (#686)
## 📝 Summary New parameter time_to_keep_mempool_txs_secs replaces old hardcoded constant BLOCKS_TO_KEEP_TXS. New metric so measure the mem used by mempool txs (so we can tune time_to_keep_mempool_txs_secs). Updated config docs with time_to_keep_mempool_txs_secs and some more. ## 💡 Motivation and Context It was unethical to kill so young txs. ## ✅ I have completed the following steps: * [X] Run `make lint` * [X] Run `make test` * [ ] Added tests (if applicable)
1 parent 5586d79 commit 7b4d74d

File tree

7 files changed

+111
-40
lines changed

7 files changed

+111
-40
lines changed

crates/rbuilder/src/bin/dummy-builder.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use rbuilder::{
2626
live_builder::{
2727
base_config::{
2828
default_ip, DEFAULT_EL_NODE_IPC_PATH, DEFAULT_INCOMING_BUNDLES_PORT,
29-
DEFAULT_RETH_DB_PATH,
29+
DEFAULT_RETH_DB_PATH, DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS,
3030
},
3131
block_list_provider::NullBlockListProvider,
3232
config::create_provider_factory,
@@ -97,6 +97,7 @@ async fn main() -> eyre::Result<()> {
9797
DEFAULT_SERVE_MAX_CONNECTIONS,
9898
DEFAULT_RESULTS_CHANNEL_TIMEOUT,
9999
DEFAULT_INPUT_CHANNEL_BUFFER_SIZE,
100+
Duration::from_secs(DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS),
100101
);
101102
let (orderpool_sender, orderpool_receiver) =
102103
mpsc::channel(order_input_config.input_channel_buffer_size);

crates/rbuilder/src/live_builder/base_config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,9 @@ pub struct BaseConfig {
143143
/// Use experimental code for faster finalize
144144
pub faster_finalize: bool,
145145

146+
/// See [OrderPool::time_to_keep_mempool_txs]
147+
pub time_to_keep_mempool_txs_secs: u64,
148+
146149
// backtest config
147150
backtest_fetch_mempool_data_dir: EnvOrValue<String>,
148151
pub backtest_fetch_eth_rpc_url: String,
@@ -542,6 +545,7 @@ pub const DEFAULT_RETH_DB_PATH: &str = "/mnt/data/reth";
542545
/// This will update every 2.4 hours, super reasonable.
543546
pub const DEFAULT_BLOCKLIST_URL_MAX_AGE_HOURS: u64 = 24;
544547
pub const DEFAULT_REQUIRE_NON_EMPTY_BLOCKLIST: bool = false;
548+
pub const DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS: u64 = 60;
545549

546550
impl Default for BaseConfig {
547551
fn default() -> Self {
@@ -591,6 +595,7 @@ impl Default for BaseConfig {
591595
ipc_provider: None,
592596
evm_caching_enable: false,
593597
faster_finalize: false,
598+
time_to_keep_mempool_txs_secs: DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS,
594599
}
595600
}
596601
}

crates/rbuilder/src/live_builder/order_input/mod.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@ use self::{
1212
orderpool::{OrderPool, OrderPoolSubscriptionId},
1313
replaceable_order_sink::ReplaceableOrderSink,
1414
};
15-
use crate::primitives::{serialize::CancelShareBundle, BundleReplacementData, Order};
1615
use crate::provider::StateProviderFactory;
17-
use crate::telemetry::{set_current_block, set_ordepool_count};
16+
use crate::telemetry::{set_current_block, set_ordepool_stats};
17+
use crate::{
18+
live_builder::base_config::DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS,
19+
primitives::{serialize::CancelShareBundle, BundleReplacementData, Order},
20+
};
1821
use alloy_consensus::Header;
1922
use jsonrpsee::RpcModule;
2023
use parking_lot::Mutex;
@@ -101,6 +104,8 @@ pub struct OrderInputConfig {
101104
results_channel_timeout: Duration,
102105
/// Size of the bounded channel.
103106
pub input_channel_buffer_size: usize,
107+
/// See [OrderPool::time_to_keep_mempool_txs]
108+
time_to_keep_mempool_txs: Duration,
104109
}
105110
pub const DEFAULT_SERVE_MAX_CONNECTIONS: u32 = 4096;
106111
pub const DEFAULT_RESULTS_CHANNEL_TIMEOUT: Duration = Duration::from_millis(50);
@@ -116,6 +121,7 @@ impl OrderInputConfig {
116121
serve_max_connections: u32,
117122
results_channel_timeout: Duration,
118123
input_channel_buffer_size: usize,
124+
time_to_keep_mempool_txs: Duration,
119125
) -> Self {
120126
Self {
121127
ignore_cancellable_orders,
@@ -126,6 +132,7 @@ impl OrderInputConfig {
126132
serve_max_connections,
127133
results_channel_timeout,
128134
input_channel_buffer_size,
135+
time_to_keep_mempool_txs,
129136
}
130137
}
131138

@@ -148,6 +155,7 @@ impl OrderInputConfig {
148155
serve_max_connections: 4096,
149156
results_channel_timeout: Duration::from_millis(50),
150157
input_channel_buffer_size: 10_000,
158+
time_to_keep_mempool_txs: Duration::from_secs(config.time_to_keep_mempool_txs_secs),
151159
})
152160
}
153161

@@ -161,6 +169,7 @@ impl OrderInputConfig {
161169
serve_max_connections: 4096,
162170
server_ip: Ipv4Addr::new(127, 0, 0, 1),
163171
server_port: 0,
172+
time_to_keep_mempool_txs: Duration::from_secs(DEFAULT_TIME_TO_KEEP_MEMPOOL_TXS_SECS),
164173
}
165174
}
166175
}
@@ -211,7 +220,7 @@ where
211220
warn!("ignore_blobs is set to true, some order input is ignored");
212221
}
213222

214-
let orderpool = Arc::new(Mutex::new(OrderPool::new()));
223+
let orderpool = Arc::new(Mutex::new(OrderPool::new(config.time_to_keep_mempool_txs)));
215224
let subscriber = OrderPoolSubscriber {
216225
orderpool: orderpool.clone(),
217226
};
@@ -361,7 +370,7 @@ where
361370

362371
let update_time = start.elapsed();
363372
let (tx_count, bundle_count) = orderpool.content_count();
364-
set_ordepool_count(tx_count, bundle_count);
373+
set_ordepool_stats(tx_count, bundle_count, orderpool.mempool_txs_size());
365374
debug!(
366375
current_block,
367376
tx_count,

crates/rbuilder/src/live_builder/order_input/orderpool.rs

Lines changed: 58 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ use crate::primitives::{
22
serialize::CancelShareBundle, BundleReplacementData, Order, OrderId, ShareBundleReplacementKey,
33
};
44
use ahash::HashMap;
5-
use alloy_eips::merge::SLOT_DURATION;
65
use lru::LruCache;
76
use reth::providers::StateProviderBox;
7+
use reth_primitives_traits::InMemorySize;
88
use std::{
99
collections::VecDeque,
1010
num::NonZeroUsize,
@@ -19,9 +19,6 @@ use super::{
1919
ReplaceableOrderPoolCommand,
2020
};
2121

22-
const BLOCKS_TO_KEEP_TXS: u32 = 5;
23-
const TIME_TO_KEEP_TXS: Duration = SLOT_DURATION.saturating_mul(BLOCKS_TO_KEEP_TXS);
24-
2522
const TIME_TO_KEEP_BUNDLE_CANCELLATIONS: Duration = Duration::from_secs(60);
2623
/// Push to pull for OrderSink. Just poll de UnboundedReceiver to get the orders.
2724
#[derive(Debug)]
@@ -69,6 +66,8 @@ pub struct OrderPoolSubscriptionId(u64);
6966
#[derive(Debug)]
7067
pub struct OrderPool {
7168
mempool_txs: Vec<(Order, Instant)>,
69+
/// Sum of measure_tx(order) for all mempool_txs
70+
mempool_txs_size: usize,
7271
/// cancelled bundle, cancellation arrival time
7372
bundle_cancellations: VecDeque<(BundleReplacementData, Instant)>,
7473
bundles_by_target_block: HashMap<u64, BundleBlockStore>,
@@ -77,16 +76,12 @@ pub struct OrderPool {
7776
known_orders: LruCache<(OrderId, u64), ()>,
7877
sinks: HashMap<OrderPoolSubscriptionId, SinkSubscription>,
7978
next_sink_id: u64,
80-
}
81-
82-
impl Default for OrderPool {
83-
fn default() -> Self {
84-
Self::new()
85-
}
79+
/// After this time a mempool tx is dropped.
80+
time_to_keep_mempool_txs: Duration,
8681
}
8782

8883
impl OrderPool {
89-
pub fn new() -> Self {
84+
pub fn new(time_to_keep_mempool_txs: Duration) -> Self {
9085
OrderPool {
9186
mempool_txs: Vec::new(),
9287
bundles_by_target_block: HashMap::default(),
@@ -95,6 +90,8 @@ impl OrderPool {
9590
sinks: Default::default(),
9691
next_sink_id: 0,
9792
bundle_cancellations: Default::default(),
93+
time_to_keep_mempool_txs,
94+
mempool_txs_size: 0,
9895
}
9996
}
10097

@@ -117,6 +114,7 @@ impl OrderPool {
117114
let (order, target_block) = match &order {
118115
Order::Tx(..) => {
119116
self.mempool_txs.push((order.clone(), Instant::now()));
117+
self.mempool_txs_size += Self::measure_tx(order);
120118
(order, None)
121119
}
122120
Order::Bundle(bundle) => {
@@ -232,6 +230,32 @@ impl OrderPool {
232230
self.sinks.remove(id).map(|s| s.sink)
233231
}
234232

233+
/// Retains if order is young and nonces are valid.
234+
pub fn must_retain_order(
235+
inserted_time: &Instant,
236+
order: &Order,
237+
new_state: &StateProviderBox,
238+
time_to_keep_mempool_txs: &Duration,
239+
) -> bool {
240+
if inserted_time.elapsed() > *time_to_keep_mempool_txs {
241+
return false;
242+
}
243+
for nonce in order.nonces() {
244+
if nonce.optional {
245+
continue;
246+
}
247+
let onchain_nonce = new_state
248+
.account_nonce(&nonce.address)
249+
.map_err(|e: reth_errors::ProviderError| error!("Failed to get a nonce: {}", e))
250+
.unwrap_or_default()
251+
.unwrap_or_default();
252+
if onchain_nonce > nonce.nonce {
253+
return false;
254+
}
255+
}
256+
true
257+
}
258+
235259
/// Should be called when last block is updated.
236260
/// It's slow but since it only happens at the start of the block it does now matter.
237261
/// It clears old txs from the mempool and old bundle_cancellations.
@@ -242,23 +266,12 @@ impl OrderPool {
242266
self.bundles_for_current_block.clear();
243267
// remove mempool txs by nonce, time
244268
self.mempool_txs.retain(|(order, time)| {
245-
if time.elapsed() > TIME_TO_KEEP_TXS {
246-
return false;
269+
let retain =
270+
Self::must_retain_order(time, order, new_state, &self.time_to_keep_mempool_txs);
271+
if !retain {
272+
self.mempool_txs_size -= Self::measure_tx(order);
247273
}
248-
for nonce in order.nonces() {
249-
if nonce.optional {
250-
continue;
251-
}
252-
let onchain_nonce = new_state
253-
.account_nonce(&nonce.address)
254-
.map_err(|e| error!("Failed to get a nonce: {}", e))
255-
.unwrap_or_default()
256-
.unwrap_or_default();
257-
if onchain_nonce > nonce.nonce {
258-
return false;
259-
}
260-
}
261-
true
274+
retain
262275
});
263276
//remove old bundle cancellations
264277
while let Some((_, oldest_time)) = self.bundle_cancellations.front() {
@@ -279,4 +292,22 @@ impl OrderPool {
279292
.sum();
280293
(tx_count, bundle_count)
281294
}
295+
296+
pub fn mempool_txs_size(&self) -> usize {
297+
self.mempool_txs_size
298+
}
299+
300+
pub fn measure_tx(order: &Order) -> usize {
301+
match order {
302+
Order::Tx(tx) => tx.size(),
303+
Order::Bundle(_) => {
304+
error!("measure_tx called on a bundle");
305+
0
306+
}
307+
Order::ShareBundle(_) => {
308+
error!("measure_tx called on an sbundle");
309+
0
310+
}
311+
}
312+
}
282313
}

crates/rbuilder/src/primitives/mod.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@ use reth_primitives::{
2626
kzg::{BYTES_PER_BLOB, BYTES_PER_COMMITMENT, BYTES_PER_PROOF},
2727
PooledTransaction, Recovered, Transaction, TransactionSigned,
2828
};
29-
use reth_primitives_traits::SignerRecoverable;
29+
use reth_primitives_traits::{InMemorySize, SignerRecoverable};
3030
use serde::{Deserialize, Serialize};
3131
use sha2::{Digest, Sha256};
32-
use std::{cmp::Ordering, collections::HashMap, fmt::Display, hash::Hash, str::FromStr, sync::Arc};
32+
use std::{
33+
cmp::Ordering, collections::HashMap, fmt::Display, hash::Hash, mem, str::FromStr, sync::Arc,
34+
};
3335
pub use test_data_generator::TestDataGenerator;
3436
use thiserror::Error;
3537
use uuid::Uuid;
@@ -50,6 +52,13 @@ impl Metadata {
5052
}
5153
}
5254

55+
impl InMemorySize for Metadata {
56+
fn size(&self) -> usize {
57+
mem::size_of::<time::OffsetDateTime>() + // received_at_timestamp
58+
mem::size_of::<Option<Address>>() // refund_identity
59+
}
60+
}
61+
5362
impl Default for Metadata {
5463
fn default() -> Self {
5564
Self::with_current_received_at()
@@ -874,6 +883,14 @@ impl MempoolTx {
874883
}
875884
}
876885

886+
impl InMemorySize for MempoolTx {
887+
fn size(&self) -> usize {
888+
self.tx_with_blobs.tx.inner().size()
889+
+ self.tx_with_blobs.blobs_sidecar.size()
890+
+ self.tx_with_blobs.metadata.size()
891+
}
892+
}
893+
877894
/// Main type used for block building, we build blocks as sequences of Orders
878895
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
879896
pub enum Order {

crates/rbuilder/src/telemetry/metrics/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,9 @@ register_metrics! {
126126
IntGauge::new("current_block", "Current Block").unwrap();
127127
pub static ORDERPOOL_TXS: IntGauge =
128128
IntGauge::new("orderpool_txs", "Transactions In The Orderpool").unwrap();
129+
130+
pub static ORDERPOOL_TXS_SIZE: IntGauge =
131+
IntGauge::new("orderpool_txs_size", "Aprox in memory size of transactions in the Orderpool (bytes)").unwrap();
129132
pub static ORDERPOOL_BUNDLES: IntGauge =
130133
IntGauge::new("orderpool_bundles", "Bundles In The Orderpool").unwrap();
131134

@@ -437,9 +440,10 @@ pub fn inc_simulation_gas_used(gas: u64) {
437440
SIMULATION_GAS_USED.inc_by(gas);
438441
}
439442

440-
pub fn set_ordepool_count(txs: usize, bundles: usize) {
443+
pub fn set_ordepool_stats(txs: usize, bundles: usize, txs_size: usize) {
441444
ORDERPOOL_TXS.set(txs as i64);
442445
ORDERPOOL_BUNDLES.set(bundles as i64);
446+
ORDERPOOL_TXS_SIZE.set(txs_size as i64);
443447
}
444448

445449
pub fn inc_order_input_rpc_errors(method: &str) {

docs/CONFIG.md

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ Every field has a default if omitted.
3939
|root_hash_threads| int|Threads used when using reth's native root hash calculation. If 0 global rayon pool is used| 0
4040
| watchdog_timeout_sec| optional int| If now block building is started in this period rbuilder exits.|None|
4141
|live_builders|vec[string]| List of `builders` to be used for live building.<br>Notice that you can define on **builders** some builders and select only a few here.|["mgp-ordering","mp-ordering"]|
42-
|evm_caching_enable|bool|Experimental. If enabled per block EVM execution will be enabled|false|
42+
|evm_caching_enable|bool|If enabled per block EVM execution will be enabled|false|
43+
|faster_finalize|bool| If enabled improves block finalization by catching proofs|false|
44+
|time_to_keep_mempool_txs_secs|u64| /// After this time a mempool tx is dropped.|1|
4345
|backtest_fetch_mempool_data_dir|env/string|Dir used to store mempool data used in backtesting|"/mnt/data/mempool"|
4446
|backtest_fetch_eth_rpc_url|string|url to EL node RPC used in backtesting|"http://127.0.0.1:8545"|
4547
|backtest_fetch_eth_rpc_parallel| int|Number of parallel connections allowed on backtest_fetch_eth_rpc_url|1|
@@ -55,6 +57,7 @@ Every field has a default if omitted.
5557
|relays|vec[RelayConfig]| List of relays used to get validator registration info and/or submitting. Below are the details for RelayConfig fields. Example: <br>[[relays]]<br>name = "relay1"<br>optimistic = true<br>priority = 1<br>url = "https://relay1"<br>use_gzip_for_submit = true<br>use_ssz_for_submit = true<br>mode:full<br><br>[[relays]]<br>name = "relay2"<br>...more params...|[]|
5658
|RelayConfig.name|mandatory string| Human readable name for the relay||
5759
|RelayConfig.url|mandatory string| Url to relay's endpoint||
60+
|RelayConfig.grpc_url|optional string| Url to relay's gRPC endpoint (only bloxroute at 2025/08/20).|None|
5861
|RelayConfig.authorization_header|optional env/string|If set "authorization" header will be added to RPC calls|None|
5962
|RelayConfig.builder_id_header|optional env/string|If set "X-Builder-Id" header will be added to RPC calls|None|
6063
|RelayConfig.api_token_header|optional env/string|If set "X-Api-Token" header will be added to RPC calls|None|
@@ -63,17 +66,18 @@ Every field has a default if omitted.
6366
|RelayConfig.use_gzip_for_submit|optional bool||false|
6467
|RelayConfig.optimistic|optional bool||false|
6568
|RelayConfig.interval_between_submissions_ms|optional int| Caps the submission rate to the relay|None|
66-
|RelayConfig.is_fast|optional bool| If the block bid > ignore_fast_bid_threshold_eth, critical blocks (the ones containing orders with replacement id) will go only to fast relays.|true|
67-
|RelayConfig.is_independent|optional bool| Big blocks (bid value > independent_bid_threshold_eth) will go only to independent relays.|true|
69+
|RelayConfig.max_bid_eth|optional string| Max bid we can submit to this relay. Any bid above this will be skipped.<br>None -> No limit.|None|
70+
|RelayConfig.is_bloxroute|bool|Set to `true` for bloxroute relays to add extra headers.|false|
71+
|RelayConfig.ask_for_filtering_validators|optional bool| Adds "filtering=true" as query to the call relay/v1/builder/validators to get all validators (including those filtering OFAC).<br>On 2025/06/24 only supported by ultrasound.|false|
72+
|RelayConfig.can_ignore_gas_limit|optional bool| If we submit a block with a different gas than the one the validator registered with in this relay the relay does not mind. Useful for gas limit conflicts. On 2025/08/20 only ultrasound confirmed that is ok with this. (we didn't asked the rest yet)|false|
6873
|enabled_relays| vec["string"]| Extra hardcoded relays to add (see DEFAULT_RELAYS in [config.rs](../crates/rbuilder/src/live_builder/config.rs))|[]|
6974
|relay_secret_key|optional env/string|Secret key that will be used to sign normal submissions to the relay.|None|
7075
|optimistic_relay_secret_key|optional env/string|Secret key that will be used to sign optimistic submissions to the relay.|None|
7176
|optimistic_enabled|bool|When enabled builder will make optimistic submissions to optimistic relays|false|
7277
|optimistic_max_bid_value_eth|string| Bids above this value will always be submitted in non-optimistic mode.|"0.0"|
7378
|cl_node_url|vec[env/stirng]| Array if urls to CL clients to get the new payload events|["http://127.0.0.1:3500"]
7479
|genesis_fork_version|optional string|Genesis fork version for the chain. If not provided it will be fetched from the beacon client.|None|
75-
|independent_bid_threshold_eth|optional string|Bids above this value will only go to independent relays.| "0"|
76-
|ignore_fast_bid_threshold_eth|optional string|For bids below this value we ignore RelayConfig::is_fast (it's like is_fast is true for all relays)| "1000"|
80+
|scraped_bids_publisher_url|string| Url to connect to the bid scraper service| "tcp://0.0.0.0:5555"|
7781
## Building algorithms
7882
rbuilder can multiple building algorithms and each algorithm can be instantiated multiple times with it's own set of parameters each time.
7983
Each instantiated algorithm starts with:

0 commit comments

Comments
 (0)