diff --git a/Justfile b/Justfile index 3d7d01aef..bdc298a43 100644 --- a/Justfile +++ b/Justfile @@ -1,38 +1,38 @@ -set dotenv-load +set dotenv-load := true export LOCAL_DATA := canonicalize(env("LOCAL_DATA", shell('mkdir -p .local && realpath .local'))) # dev = use locally built images # prod = use releases -export BASED_ENV := env("BASED_ENV", "prod") +export BASED_ENV := env("BASED_ENV", "prod") self := "just -f " + justfile() deps := "just -f " + join(justfile_directory(), "deps", "Justfile") # Verifies that system dependencies are present @check: - echo "jq: {{require('jq')}}" - echo "docker: {{require('docker')}}" - echo "cast: {{require('cast')}}" - echo "rustup: {{require('rustup')}}" - echo "python: {{require('python')}}" + echo "jq: {{ require('jq') }}" + echo "docker: {{ require('docker') }}" + echo "cast: {{ require('cast') }}" + echo "rustup: {{ require('rustup') }}" + echo "python: {{ require('python') }}" # Prepare the local environment: fetch deps, build them, setup toolchains... @prepare: - {{deps}} fetch + {{ deps }} fetch cd docs && npm i - cd based && rustup toolchain install + cd based && rustup toolchain install # 🏗️ Build -build: +build: #!/usr/bin/env bash set -euo pipefail - - {{deps}} build & + + {{ deps }} build & just -f based/docker/Justfile all & wait - + # 📚 Build local docs docs: just -f docs/Justfile serve @@ -40,99 +40,98 @@ docs: # Build and link rabby in the configured output folder rabby out="./dist": just -f deps/rabby.just build - ln -s deps/rabby/dist {{out}}/rabby - ln -s deps/rabby/dist-mv2 {{out}}/rabby-mv2 + ln -s deps/rabby/dist {{ out }}/rabby + ln -s deps/rabby/dist-mv2 {{ out }}/rabby-mv2 ## Component access (component verb) # Run recipes from scripts/spamoor.just spamoor *args=("start ./spamoor-config.yml"): - just -f scripts/spamoor.just {{args}} + just -f scripts/spamoor.just {{ args }} # Run recipes from scripts/overseer.just overseer *args=("start"): - just -f based/overseer.just {{args}} + just -f based/overseer.just {{ args }} # Run recipes from based/portal.just portal *args: - just -f based/portal.just {{args}} + just -f based/portal.just {{ args }} # Run recipes from based/registry.just registry *args: - just -f based/registry.just {{args}} + just -f based/registry.just {{ args }} # Run recipes from based/main-node.just main-node *args: - just -f based/main-node.just {{args}} + just -f based/main-node.just {{ args }} # Run recipes from based/follower-node.just follower-node *args: - just -f based/follower-node.just {{args}} + just -f based/follower-node.just {{ args }} # Run recipes from based/monitoring.just monitoring *args: - just -f scripts/monitoring.just {{args}} + just -f scripts/monitoring.just {{ args }} ## Action access (verb component) # View logs for the given service logs name: - just -f scripts/logs.just {{name}} + just -f scripts/logs.just {{ name }} -# Start the given service +# Start the given service start name: - {{self}} {{name}} start - -# Stop the given service + {{ self }} {{ name }} start + +# Stop the given service stop name: - {{self}} {{name}} stop + {{ self }} {{ name }} stop # Run a test recipe described in scripts/test.just test name: - just -f scripts/test.just {{name}} + just -f scripts/test.just {{ name }} # Cleanup all the local state of the project reset: - {{self}} main-node reset - {{self}} follower-node reset + {{ self }} main-node reset + {{ self }} follower-node reset rm -rf $LOCAL_DATA # Run a recipe from scripts/ci.just ci *args: - just -f scripts/ci.just {{args}} + just -f scripts/ci.just {{ args }} # TODO: consider some sort of interactive config if needed quick-start: - {{self}} main-node config-with-deploy - {{self}} main-node start - {{self}} follower-node create-config - {{self}} follower-node start-dev + {{ self }} main-node config-with-deploy + {{ self }} main-node start + {{ self }} follower-node create-config + {{ self }} follower-node start-dev echo "Waiting for 10 seconds before starting the overseer" && sleep 10 - {{self}} overseer start - + {{ self }} overseer start # Cleanup all local state -reset-and-start-full-stack-local: +reset-and-start-full-stack-local: #!/usr/bin/env bash set -euo pipefail export PUBLIC_IP=127.0.0.1 echo "Ensuring required environment variables are available..." - echo 'OP_BATCHER_KEY={{env("OP_BATCHER_KEY")}}' - echo 'OP_PROPOSER_KEY={{env("OP_PROPOSER_KEY")}}' - echo 'OP_SEQUENCER_KEY={{env("OP_SEQUENCER_KEY")}}' + echo 'OP_BATCHER_KEY={{ env("OP_BATCHER_KEY") }}' + echo 'OP_PROPOSER_KEY={{ env("OP_PROPOSER_KEY") }}' + echo 'OP_SEQUENCER_KEY={{ env("OP_SEQUENCER_KEY") }}' echo "Resetting configuration and deploying new L2 from scratch" - {{self}} reset || true + {{ self }} reset || true - {{self}} main-node config-with-deploy - {{self}} main-node start - {{self}} follower-node create-config - {{self}} follower-node start-dev + {{ self }} main-node config-with-deploy + {{ self }} main-node start + {{ self }} follower-node create-config + {{ self }} follower-node start-dev echo "Waiting for 15 seconds before triggering peering and starting the overseer" && sleep 15 - + python peering.py - {{self}} start overseer + # {{ self }} start overseer diff --git a/based/Cargo.lock b/based/Cargo.lock index af514c813..58d234587 100644 --- a/based/Cargo.lock +++ b/based/Cargo.lock @@ -397,8 +397,8 @@ dependencies = [ "derive_more", "foldhash 0.2.0", "getrandom 0.3.3", - "hashbrown 0.16.0", - "indexmap 2.10.0", + "hashbrown 0.16.1", + "indexmap 2.12.1", "itoa", "k256", "keccak-asm", @@ -534,6 +534,7 @@ dependencies = [ "alloy-primitives", "alloy-rpc-types-engine", "alloy-rpc-types-eth", + "alloy-rpc-types-mev", "alloy-serde", "serde", ] @@ -757,7 +758,7 @@ dependencies = [ "alloy-sol-macro-input", "const-hex", "heck 0.5.0", - "indexmap 2.10.0", + "indexmap 2.12.1", "proc-macro-error2", "proc-macro2", "quote", @@ -1575,7 +1576,7 @@ dependencies = [ "clap", "eyre", "futures", - "indexmap 2.10.0", + "indexmap 2.12.1", "jsonrpsee", "op-alloy-rpc-types", "op-alloy-rpc-types-engine", @@ -1808,6 +1809,7 @@ dependencies = [ "clap", "crossbeam-channel", "directories", + "either", "ethereum_ssz", "eyre", "http", @@ -1867,6 +1869,7 @@ dependencies = [ "tree_hash", "tree_hash_derive", "uuid", + "wyhash", ] [[package]] @@ -1973,6 +1976,7 @@ dependencies = [ "alloy-consensus", "alloy-primitives", "bop-common", + "indexmap 2.12.1", "op-revm", "reth-optimism-primitives", "reth-primitives-traits", @@ -2001,6 +2005,7 @@ dependencies = [ "reqwest", "reth-optimism-payload-builder", "reth-optimism-primitives", + "reth-optimism-txpool", "reth-rpc-layer", "serde", "serde_json", @@ -3824,7 +3829,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap 2.10.0", + "indexmap 2.12.1", "slab", "tokio", "tokio-util", @@ -3871,12 +3876,13 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.16.0" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" dependencies = [ "foldhash 0.2.0", "serde", + "serde_core", ] [[package]] @@ -4389,14 +4395,15 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.10.0" +version = "2.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" +checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" dependencies = [ "arbitrary", "equivalent", - "hashbrown 0.15.3", + "hashbrown 0.16.1", "serde", + "serde_core", ] [[package]] @@ -5123,7 +5130,7 @@ dependencies = [ "hyper", "hyper-rustls", "hyper-util", - "indexmap 2.10.0", + "indexmap 2.12.1", "ipnet", "metrics", "metrics-util", @@ -10401,7 +10408,7 @@ version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" dependencies = [ - "indexmap 2.10.0", + "indexmap 2.12.1", "itoa", "memchr", "ryu", @@ -10450,7 +10457,7 @@ dependencies = [ "chrono", "hex 0.4.3", "indexmap 1.9.3", - "indexmap 2.10.0", + "indexmap 2.12.1", "serde", "serde_derive", "serde_json", @@ -11238,7 +11245,7 @@ version = "0.22.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "310068873db2c5b3e7659d2cc35d21855dbafa50d1ce336397c666e3cb08137e" dependencies = [ - "indexmap 2.10.0", + "indexmap 2.12.1", "serde", "serde_spanned", "toml_datetime", @@ -11298,7 +11305,7 @@ dependencies = [ "futures-core", "futures-util", "hdrhistogram", - "indexmap 2.10.0", + "indexmap 2.12.1", "pin-project-lite", "slab", "sync_wrapper", @@ -12756,6 +12763,15 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wyhash" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca4d373340c479fd1e779f7a763acee85da3e423b1a9a9acccf97babcc92edbb" +dependencies = [ + "rand_core 0.9.3", +] + [[package]] name = "wyz" version = "0.5.1" diff --git a/based/Cargo.toml b/based/Cargo.toml index e5df44acc..20f98c553 100644 --- a/based/Cargo.toml +++ b/based/Cargo.toml @@ -12,12 +12,10 @@ version = "0.1.0" alloy-consensus = "1.0.41" alloy-eips = "1.0.41" alloy-network = "1.0.41" -alloy-primitives = { version = "1.0.41", default-features = false, features = [ - "getrandom", -] } +alloy-primitives = { version = "1.0.41", default-features = false, features = ["getrandom"] } alloy-provider = "1.0.41" alloy-rlp = "0.3.12" -alloy-rpc-types = { version = "1.0.41", features = ["engine"] } +alloy-rpc-types = { version = "1.0.41", features = ["engine", "mev"] } alloy-signer = "1.0.41" alloy-signer-local = "1.0.41" alloy-transport = "1.0.41" @@ -41,30 +39,28 @@ either = "1.15.0" ethereum_ssz = "0.9.0" eyre = "0.6.12" futures = "0.3.31" -hickory-resolver = "=0.25.0-alpha.5" # Use the exact version reth expects +hickory-resolver = "=0.25.0-alpha.5" # Use the exact version reth expects http = "1.3.1" hyper = "1.5.2" -jsonrpsee = { version = "0.26", features = ["http-client", "macros", "server", "jsonrpsee-client-transport"] } +indexmap = "2.12" +jsonrpsee = { version = "0.26", features = ["http-client", "jsonrpsee-client-transport", "macros", "server"] } metrics = "0.24.1" metrics-exporter-prometheus = "0.16.2" mio = { features = ["net", "os-poll"], version = "1.0.4" } mio_httpc = { features = ["native"], version = "0.10.6" } moka = "0.12.10" -op-alloy-consensus = { version = "0.22.0", default-features = false, features = [ - "k256", -] } +op-alloy-consensus = { version = "0.22.0", default-features = false, features = ["k256"] } op-alloy-flz = { version = "0.13.1", default-features = false } op-alloy-network = "0.22.0" op-alloy-rpc-types = "0.22.0" -op-alloy-rpc-types-engine = { version = "0.22.0", default-features = false, features = [ - "serde", -] } +op-alloy-rpc-types-engine = { version = "0.22.0", default-features = false, features = ["serde"] } op-revm = "12.0.1" parking_lot = "0.12.3" paste = "0.1.18" quanta = "0.12.3" rand = "0.9.0" reqwest = { version = "0.12.12", features = ["blocking", "json"] } +wyhash = "0.6.0" reth-chain-state = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-chain-state" } reth-chainspec = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-chainspec" } @@ -73,10 +69,12 @@ reth-consensus = { git = "https://github.com/gattaca-com/based-op-reth", rev = " reth-db = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-db" } reth-db-api = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-db-api" } reth-db-common = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-db-common" } +reth-engine-tree = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-engine-tree" } reth-evm = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-evm" } reth-execution-errors = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-execution-errors" } -reth-engine-tree = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-engine-tree" } -reth-node-ethereum = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-node-ethereum", features = ["test-utils"] } +reth-node-ethereum = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-node-ethereum", features = [ + "test-utils", +] } reth-node-types = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-node-types" } reth-optimism-chainspec = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-optimism-chainspec" } reth-optimism-cli = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-optimism-cli" } @@ -85,11 +83,13 @@ reth-optimism-evm = { git = "https://github.com/gattaca-com/based-op-reth", rev reth-optimism-forks = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-optimism-forks" } reth-optimism-node = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-optimism-node" } reth-optimism-payload-builder = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-optimism-payload-builder" } -reth-optimism-txpool = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-optimism-txpool" } reth-optimism-primitives = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-optimism-primitives" } +reth-optimism-txpool = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-optimism-txpool" } reth-primitives = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-primitives" } reth-primitives-traits = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-primitives-traits" } -reth-provider = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-provider", features = ["test-utils"] } +reth-provider = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-provider", features = [ + "test-utils", +] } reth-revm = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-revm" } reth-rpc-builder = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-rpc-builder" } reth-rpc-layer = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-rpc-layer" } @@ -97,21 +97,17 @@ reth-stages-types = { git = "https://github.com/gattaca-com/based-op-reth", rev reth-storage-api = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-storage-api" } reth-storage-errors = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-storage-errors" } reth-trie = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-trie" } -reth-trie-common = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-trie-common", features = ["test-utils"] } +reth-trie-common = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-trie-common", features = [ + "test-utils", +] } reth-trie-db = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-trie-db" } reth-trie-parallel = { git = "https://github.com/gattaca-com/based-op-reth", rev = "b053849462eb48e61b24d965cfee59cead7f8a3b", package = "reth-trie-parallel" } -revm = { version = "31.0.1", features = [ - "optional_balance_check", - "secp256k1", - "std", -], default-features = false } +revm = { version = "31.0.1", features = ["optional_balance_check", "secp256k1", "std"], default-features = false } revm-handler = "12.0.0" revm-inspector = "12.0.1" revm-interpreter = "29.0.1" -revm-primitives = { version = "21.0.2", features = [ - "std", -], default-features = false } +revm-primitives = { version = "21.0.2", features = ["std"], default-features = false } rustc-hash = "2.0.0" serde = { version = "1.0.217", features = ["derive"] } diff --git a/based/bin/gateway/src/main.rs b/based/bin/gateway/src/main.rs index 8d816119d..739a9dc2e 100644 --- a/based/bin/gateway/src/main.rs +++ b/based/bin/gateway/src/main.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use bop_common::{ actor::{Actor, ActorConfig}, communication::Spine, @@ -88,12 +86,11 @@ fn run(mut args: GatewayArgs) -> eyre::Result<()> { let gossip_signer_private_key = args.gossip_signer_private_key().map(|key| ECDSASigner::new(key).unwrap()); std::thread::scope(|s| { - let rt: Arc = tokio::runtime::Builder::new_current_thread() + let rt: Runtime = tokio::runtime::Builder::new_current_thread() .worker_threads(10) .enable_all() .build() - .expect("failed to create runtime") - .into(); + .expect("failed to create runtime"); if args.enable_metrics { s.spawn(move || { @@ -105,7 +102,6 @@ fn run(mut args: GatewayArgs) -> eyre::Result<()> { } s.spawn({ - let rt = rt.clone(); start_rpc(&args, &spine, &rt, frag_broadcast_tx.clone(), args.da_config.clone()); move || rt.block_on(wait_for_signal()) }); diff --git a/based/bin/overseer/src/data.rs b/based/bin/overseer/src/data.rs index 353f2430a..be6901aa3 100644 --- a/based/bin/overseer/src/data.rs +++ b/based/bin/overseer/src/data.rs @@ -9,7 +9,12 @@ use bop_common::{ walkie_talkie::{self, RpcResponse}, }, signing::ECDSASigner, - telemetry::{Telemetry, frag::Frag, order::Tx, system::SystemNotification}, + telemetry::{ + Telemetry, + frag::Frag, + order::{Bundle, Tx}, + system::SystemNotification, + }, time::{Duration, TimingMessage}, typedefs::HashMap, }; @@ -804,6 +809,13 @@ impl Data { } self.insert_transaction(key, t, tx_update); } + Telemetry::Bundle(bundle_update) => { + if let Bundle::Included(included) = &bundle_update { + if let Some(frag) = self.frags.get_mut(&included.frag) { + frag.add_bundle(key, included.clone()); + } + } + } Telemetry::Frag(update) => { self.insert_frag(t, key, update); } diff --git a/based/bin/overseer/src/data/frag.rs b/based/bin/overseer/src/data/frag.rs index b75f5670e..02f62e8ad 100644 --- a/based/bin/overseer/src/data/frag.rs +++ b/based/bin/overseer/src/data/frag.rs @@ -1,6 +1,9 @@ use bop_common::{ eth::MicroEth, - telemetry::{Frag, order::IncludedInFrag}, + telemetry::{ + Frag, + order::{BundleInclusion, TransactionInclusion}, + }, time::Nanos, }; use ratatui::text::Text; @@ -14,6 +17,7 @@ pub struct FragData { pub uuid: Uuid, pub updates: Vec<(Nanos, Frag)>, pub txs: Vec, + pub bundles: Vec, pub payment: MicroEth, pub gas_used: u64, pub sim_time: Nanos, @@ -28,13 +32,21 @@ impl FragData { self.updates.push((t, update)) } - pub fn add_tx(&mut self, uuid: Uuid, included: IncludedInFrag) { + pub fn add_tx(&mut self, uuid: Uuid, included: TransactionInclusion) { self.txs.push(uuid); self.payment += included.payment; self.gas_used += included.gas_used; self.sim_time += included.sim_time; } + pub fn add_bundle(&mut self, uuid: Uuid, included: BundleInclusion) { + self.bundles.push(uuid); + self.txs.extend(included.txs.iter().map(|tx| tx.0)); + self.payment += included.payment(); + self.gas_used += included.gas_used(); + self.sim_time += included.sim_time(); + } + pub fn block_table_header() -> impl ExactSizeIterator> { ["Timestamp", "Seq", "# T", "Payment", "Gas", "Simtime"].into_iter().map(|t| t.into()) } diff --git a/based/bin/overseer/src/data/transaction.rs b/based/bin/overseer/src/data/transaction.rs index 81000d1d1..283971856 100644 --- a/based/bin/overseer/src/data/transaction.rs +++ b/based/bin/overseer/src/data/transaction.rs @@ -1,6 +1,6 @@ use alloy_primitives::{Address, B256}; use bop_common::{ - telemetry::order::{IncludedInFrag, Ingested, Tx}, + telemetry::order::{Ingested, TransactionInclusion, Tx}, time::{Duration, Nanos, Repeater}, }; use ratatui::{ @@ -35,7 +35,7 @@ impl TransactionData { }) } - pub fn included_in_frags(&self) -> impl Iterator { + pub fn included_in_frags(&self) -> impl Iterator { self.updates.iter().filter_map(|(t, u)| match u { Tx::Included(included) => Some((t, included)), _ => None, diff --git a/based/bin/txspammer/src/account.rs b/based/bin/txspammer/src/account.rs index e60ef40ce..ef8c950f8 100644 --- a/based/bin/txspammer/src/account.rs +++ b/based/bin/txspammer/src/account.rs @@ -63,12 +63,11 @@ impl Account { &mut self, to: &mut Account, spec: &TxSpec, - provider: &RootProvider, - sequencer: &Option, - ) -> eyre::Result { + nonce: u64, + ) -> eyre::Result<(TxEnvelope, Vec)> { let tx = TxEip1559 { chain_id: spec.chain_id, - nonce: self.nonce, + nonce, gas_limit: spec.gas_limit, max_fee_per_gas: spec.max_fee_per_gas, max_priority_fee_per_gas: spec.max_priority_fee_per_gas, @@ -81,6 +80,17 @@ impl Account { let sig = self.signer.sign_hash_sync(&tx.signature_hash()).unwrap(); let tx: TxEnvelope = tx.into_signed(sig).into(); let encoded = tx.encoded_2718(); + Ok((tx, encoded)) + } + + pub async fn do_transfer( + &mut self, + to: &mut Account, + spec: &TxSpec, + provider: &RootProvider, + sequencer: &Option, + ) -> eyre::Result { + let (tx, encoded) = self.transfer(to, spec, self.nonce).await?; let provider_to_use = sequencer.as_ref().unwrap_or(provider); let _pending_tx = provider_to_use.send_raw_transaction(&encoded).await.unwrap(); self.nonce += 1; diff --git a/based/bin/txspammer/src/cli.rs b/based/bin/txspammer/src/cli.rs index 95d3b105d..b580d1602 100644 --- a/based/bin/txspammer/src/cli.rs +++ b/based/bin/txspammer/src/cli.rs @@ -23,6 +23,9 @@ pub struct TxSpammerArgs { /// Print account addresses. #[arg(long = "print_accounts", action = clap::ArgAction::SetTrue)] pub print_accounts: bool, + /// Send bundles + #[arg(long = "send_bundles", action = clap::ArgAction::SetTrue)] + pub send_bundles: bool, // --- Transaction Parameters --- /// Target transaction throughput in transactions per second (TPS). diff --git a/based/bin/txspammer/src/main.rs b/based/bin/txspammer/src/main.rs index c41fb5ce9..322f09283 100644 --- a/based/bin/txspammer/src/main.rs +++ b/based/bin/txspammer/src/main.rs @@ -6,10 +6,11 @@ use std::{ }; use alloy_primitives::{ - TxHash, U256, + TxHash, U256, hex, utils::{format_ether, parse_ether}, }; use alloy_provider::{Provider, RootProvider, WsConnect}; +use alloy_rpc_types::mev::EthBundleHash; use alloy_signer_local::PrivateKeySigner; use bop_common::{p2p::SignedVersionedMessage, utils::init_tracing}; use clap::Parser; @@ -18,6 +19,7 @@ use futures_util::stream::StreamExt; use http::Uri; use rand::{Rng, SeedableRng}; use reqwest::Url; +use serde_json::json; use tokio::{ sync::mpsc::{self, Receiver, Sender}, time::{interval, sleep}, @@ -145,7 +147,7 @@ impl TxSpammer { continue; } self.root_account - .transfer( + .do_transfer( account, &TxSpec { chain_id: self.tx_spec.chain_id, @@ -289,7 +291,7 @@ impl TxSpammer { let to = &mut accounts_clone[rag2.random_range(0..n)]; let start_sending_at = Instant::now(); let tx_hash = account - .transfer(to, &tx_spec, &full_provider, &sequencer_provider) + .do_transfer(to, &tx_spec, &full_provider, &sequencer_provider) .await .expect("failed to send tx"); request_tx.send((tx_hash, start_sending_at)).await.expect("failed to send tx hash to logger"); @@ -302,6 +304,42 @@ impl TxSpammer { }); } } + + pub fn spawn_bundle_spammer(&self) { + let mut root_account = self.root_account.clone(); + let tx_spec = self.tx_spec.clone(); + let full_provider = self.full_provider.clone(); + let sequencer_provider = self.sequencer.clone().expect("sequencer provider not specified"); + let mut accounts_clone = self.target_accounts.clone(); + let request_tx = self.request_tx.clone(); + tokio::spawn(async move { + sleep(Duration::from_secs(3)).await; + root_account.refresh_nonce(&full_provider).await.expect("failed to refresh root account nonce"); + let mut nonce = root_account.nonce; + let client = sequencer_provider.client(); + let mut interval = interval(Duration::from_secs_f64(0.55)); + loop { + interval.tick().await; + let mut txs = Vec::new(); + for account in accounts_clone.iter_mut() { + let (tx, encoded) = + root_account.transfer(account, &tx_spec, nonce).await.expect("failed to send tx"); + txs.push(hex::encode(encoded)); + request_tx.send((*tx.tx_hash(), Instant::now())).await.expect("failed to send tx hash to logger"); + nonce += 1; + } + + let payload = json!({ + "txs": txs, + "blockNumber": 0, + }); + + let respond: EthBundleHash = + client.request("eth_sendBundle", (payload,)).await.expect("failed to send bundle"); + info!("Response: {:?}", respond); + } + }); + } } #[tokio::main] @@ -337,6 +375,10 @@ async fn main() -> eyre::Result<()> { spammer.spawn_stats_logger(); spammer.spawn_spammer(); + if spammer.args.send_bundles { + spammer.spawn_bundle_spammer(); + } + tokio::signal::ctrl_c().await.expect("failed to listen for ctrl-c"); info!("Received Ctrl-C, shutting down..."); diff --git a/based/crates/common/Cargo.toml b/based/crates/common/Cargo.toml index 09fa30821..df854823c 100644 --- a/based/crates/common/Cargo.toml +++ b/based/crates/common/Cargo.toml @@ -27,6 +27,7 @@ chrono.workspace = true clap.workspace = true crossbeam-channel.workspace = true directories.workspace = true +either.workspace = true ethereum_ssz.workspace = true eyre.workspace = true http.workspace = true @@ -86,6 +87,7 @@ tracing-subscriber.workspace = true tree_hash.workspace = true tree_hash_derive.workspace = true uuid.workspace = true +wyhash.workspace = true [dev-dependencies] serde_json.workspace = true diff --git a/based/crates/common/src/api.rs b/based/crates/common/src/api.rs index d05b513c9..b7e3d2d69 100644 --- a/based/crates/common/src/api.rs +++ b/based/crates/common/src/api.rs @@ -2,7 +2,10 @@ use std::collections::HashMap; use alloy_eips::eip7685::RequestsOrHash; use alloy_primitives::{Address, B256, Bytes, U64}; -use alloy_rpc_types::engine::{ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus}; +use alloy_rpc_types::{ + engine::{ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus}, + mev::{EthBundleHash, EthSendBundle}, +}; use jsonrpsee::proc_macros::rpc; use op_alloy_consensus::OpTxEnvelope; use op_alloy_rpc_types_engine::{OpExecutionPayloadEnvelopeV3, OpExecutionPayloadV4, OpPayloadAttributes}; @@ -83,6 +86,15 @@ pub trait MinimalEthApi { async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult; } +#[rpc(client, server, namespace = "eth")] +pub trait MinimalMevApi { + /// Sends an atomic bundle of transactions, returning the bundle hash. + /// + /// ref: + #[method(name = "sendBundle")] + async fn send_bundle(&self, bundle: EthSendBundle) -> RpcResult; +} + #[rpc(client, server, namespace = "registry")] pub trait RegistryApi { /// Returns the future blocknumber and corresponding gateway url and address diff --git a/based/crates/common/src/communication/messages.rs b/based/crates/common/src/communication/messages.rs index 8cc343890..ca63ec27d 100644 --- a/based/crates/common/src/communication/messages.rs +++ b/based/crates/common/src/communication/messages.rs @@ -12,6 +12,7 @@ use alloy_rpc_types::engine::{ use jsonrpsee::types::{ErrorCode, ErrorObject as RpcErrorObject}; use op_alloy_rpc_types_engine::{OpExecutionPayloadV4, OpPayloadAttributes}; use reth_evm::{NextBlockEnvAttributes, execute::BlockExecutionError}; +use reth_optimism_node::txpool::supervisor::InteropTxValidatorError; use reth_primitives_traits::transaction::signed::RecoveryError; use revm_primitives::{Address, U256}; use serde::{Deserialize, Serialize}; @@ -21,7 +22,8 @@ use tokio::sync::oneshot::{self}; use crate::{ custom_v4::OpExecutionPayloadEnvelopeV4Patch, - db::{DBFrag, DBSorting}, + db::{DBFrag, DBSorting, sorting::StateId}, + order::{SimulatedBundle, ValidatedBundle, bundle::BundleValidationError}, time::{Duration, IngestionTime, Instant, Nanos}, transaction::{SimulatedTx, Transaction}, typedefs::*, @@ -268,6 +270,12 @@ pub enum RpcError { #[error("invalid transaction bytes")] InvalidTransaction(#[from] alloy_rlp::Error), + #[error("invalid cross-chain transaction")] + InvalidCrossChainTransaction(#[from] InteropTxValidatorError), + + #[error("invalid bundle: {0}")] + InvalidBundle(#[from] BundleValidationError), + #[error("jsonrpsee error {0}")] Jsonrpsee(#[from] jsonrpsee::core::ClientError), @@ -313,6 +321,18 @@ impl From for RpcErrorObject<'static> { Some(error.to_string()), ), + RpcError::InvalidCrossChainTransaction(error) => RpcErrorObject::owned( + ErrorCode::InvalidParams.code(), + ErrorCode::InvalidParams.message(), + Some(error.to_string()), + ), + + RpcError::InvalidBundle(error) => RpcErrorObject::owned( + ErrorCode::InvalidParams.code(), + ErrorCode::InvalidParams.message(), + Some(error.to_string()), + ), + RpcError::NoCommitmentForRequest(slot) => RpcErrorObject::owned( ErrorCode::InvalidParams.code(), ErrorCode::InvalidParams.message(), @@ -340,12 +360,20 @@ pub enum SequencerToSimulator { /// Simulate Tx Top of frag //TODO: Db could be set on frag commit once we broadcast msgs to sims SimulateTxTof(Arc, DBFrag), + /// Simulate a bundle. + SimulateBundle(Arc, DBSorting), + /// Simulate a bundle Top of frag + SimulateBundleTof(Arc, DBFrag), } + impl SequencerToSimulator { - pub fn sim_info(&self) -> (Address, u64, u64) { + /// Returns simulation info. + pub fn sim_info(&self) -> (StateId, Vec<(Address, u64)>) { match self { - SequencerToSimulator::SimulateTx(t, db) => (t.sender(), t.nonce(), db.state_id()), - SequencerToSimulator::SimulateTxTof(t, db) => (t.sender(), t.nonce(), db.state_id()), + SequencerToSimulator::SimulateTx(t, db) => (db.state_id(), vec![(t.sender(), t.nonce())]), + SequencerToSimulator::SimulateTxTof(t, db) => (db.state_id(), vec![(t.sender(), t.nonce())]), + SequencerToSimulator::SimulateBundle(b, db) => (db.state_id(), b.sender_nonces().collect()), + SequencerToSimulator::SimulateBundleTof(b, db) => (db.state_id(), b.sender_nonces().collect()), } } } @@ -353,35 +381,41 @@ impl SequencerToSimulator { #[derive(Debug)] pub struct SimulatorToSequencer { /// Sender address and nonce - pub sender_info: (Address, u64), + pub sender_nonces: Vec<(Address, u64)>, pub state_id: u64, pub simtime: Duration, pub msg: SimulatorToSequencerMsg, } impl SimulatorToSequencer { - pub fn new(sender_info: (Address, u64), state_id: u64, simtime: Duration, msg: SimulatorToSequencerMsg) -> Self { - Self { sender_info, state_id, simtime, msg } - } - - pub fn sender(&self) -> &Address { - &self.sender_info.0 + pub fn new( + sender_nonces: Vec<(Address, u64)>, + state_id: u64, + simtime: Duration, + msg: SimulatorToSequencerMsg, + ) -> Self { + Self { sender_nonces, state_id, simtime, msg } } - pub fn nonce(&self) -> u64 { - self.sender_info.1 + pub fn sender_nonces(&self) -> &[(Address, u64)] { + &self.sender_nonces } } pub type SimulationResult = Result; +/// TODO(mempirate): Simplify? Just use orders here? #[derive(Debug, AsRefStr)] #[repr(u8)] pub enum SimulatorToSequencerMsg { /// Simulation on top of any state. Tx(SimulationResult), - /// Simulation on top of a fragment. Used by the transaction pool. + /// Simulation on top of a fragment. Used by the order pool. TxPoolTopOfFrag(SimulationResult), + /// Simulation on top of a bundle. + Bundle(SimulationResult), + /// Simulation on top of a bundle. Used by the order pool. + BundleTopOfFrag(SimulationResult), } #[derive(Clone, Debug, Error, AsRefStr)] diff --git a/based/crates/common/src/communication/mod.rs b/based/crates/common/src/communication/mod.rs index 4cd5a4bf5..b9d8e1dc2 100644 --- a/based/crates/common/src/communication/mod.rs +++ b/based/crates/common/src/communication/mod.rs @@ -2,7 +2,6 @@ use std::{ fs::read_dir, marker::PhantomData, path::{Path, PathBuf}, - sync::Arc, }; use messages::{BlockFetch, EngineApi, SequencerToExternal, SequencerToSimulator, SimulatorToSequencer}; @@ -11,7 +10,7 @@ use reth_evm::EvmEnv; use shared_memory::ShmemError; use thiserror::Error; -use crate::{p2p::VersionedMessageWithState, typedefs::*}; +use crate::{order::Order, p2p::VersionedMessageWithState, typedefs::*}; pub mod queue; pub mod seqlock; @@ -25,7 +24,6 @@ pub use walkie_talkie::WalkieTalkie; use crate::{ time::{Duration, IngestionTime, Instant, Timer}, - transaction::Transaction, utils::{full_last_part_of_typename, last_part_of_typename}, }; @@ -260,8 +258,8 @@ pub struct Spine { sender_engine_rpc_to_sequencer: Sender, receiver_engine_rpc_to_sequencer: CrossBeamReceiver, - sender_eth_rpc_to_sequencer: Sender>, - receiver_eth_rpc_to_sequencer: CrossBeamReceiver>, + sender_eth_rpc_to_sequencer: Sender, + receiver_eth_rpc_to_sequencer: CrossBeamReceiver, sender_blockfetch_to_sequencer: Sender, receiver_blockfetch_to_sequencer: CrossBeamReceiver, @@ -371,7 +369,7 @@ from_spine!(SimulatorToSequencer, simulator_to_sequencer, Sender); from_spine!(SequencerToSimulator, sequencer_to_simulator, Sender); from_spine!(SequencerToExternal, sequencer_to_rpc, Sender); from_spine!(messages::EngineApi, engine_rpc_to_sequencer, Sender); -from_spine!(Arc, eth_rpc_to_sequencer, Sender); +from_spine!(Order, eth_rpc_to_sequencer, Sender); from_spine!(BlockSyncMessage, blockfetch_to_sequencer, Sender); from_spine!(messages::BlockFetch, sequencer_to_blockfetch, Sender); @@ -400,7 +398,7 @@ pub struct SendersSpine { sequencer_to_rpc: Sender, simulator_to_sequencer: Sender, engine_rpc_to_sequencer: Sender, - eth_rpc_to_sequencer: Sender>, + eth_rpc_to_sequencer: Sender, blockfetch_to_sequencer: Sender, sequencer_frag_broadcast: Sender, evm_block_params: Producer>>, @@ -446,7 +444,7 @@ pub struct ReceiversSpine { sequencer_to_simulator: Receiver>, sequencer_to_rpc: Receiver, engine_rpc_to_sequencer: Receiver, - eth_rpc_to_sequencer: Receiver>, + eth_rpc_to_sequencer: Receiver, blockfetch_to_sequencer: Receiver, sequencer_frag_broadcast: Receiver, evm_block_params: Receiver, Consumer>>>, diff --git a/based/crates/common/src/db/sorting.rs b/based/crates/common/src/db/sorting.rs index a02ee9368..ab62bbd2c 100644 --- a/based/crates/common/src/db/sorting.rs +++ b/based/crates/common/src/db/sorting.rs @@ -5,12 +5,14 @@ use parking_lot::RwLock; use super::{DBFrag, State}; use crate::typedefs::*; +pub type StateId = u64; + /// DB That is used when sorting a new frag /// Thread safe #[derive(Clone, Debug)] pub struct DBSorting { pub db: Arc>>>, - state_id: u64, + state_id: StateId, } impl DBSorting { @@ -18,7 +20,7 @@ impl DBSorting { Self { db: Arc::new(RwLock::new(State::new(frag_db))), state_id: rand::random() } } - pub fn state_id(&self) -> u64 { + pub fn state_id(&self) -> StateId { self.state_id } } diff --git a/based/crates/common/src/lib.rs b/based/crates/common/src/lib.rs index f7b397865..5b592072f 100644 --- a/based/crates/common/src/lib.rs +++ b/based/crates/common/src/lib.rs @@ -6,6 +6,7 @@ pub mod db; pub mod eth; pub mod fabric; pub mod metrics; +pub mod order; pub mod p2p; pub mod shared; pub mod signing; diff --git a/based/crates/common/src/order/bundle.rs b/based/crates/common/src/order/bundle.rs new file mode 100644 index 000000000..731e0591f --- /dev/null +++ b/based/crates/common/src/order/bundle.rs @@ -0,0 +1,289 @@ +//! Bundle order type definitions and related functionality. + +use std::{ + hash::{Hash, Hasher}, + sync::{Arc, OnceLock}, +}; + +use alloy_consensus::Transaction as _; +use alloy_eips::{Decodable2718, Encodable2718, eip2718::Eip2718Error}; +use alloy_primitives::{Address, B256, Bytes, TxHash, U64, U256}; +use alloy_rpc_types::mev::EthSendBundle; +use op_alloy_consensus::OpTxEnvelope; +use reth_primitives_traits::{InMemorySize, SignedTransaction}; +use uuid::Uuid; + +use crate::{ + order::ResultAndStateMany, + time::Nanos, + transaction::{SimulatedTx, Transaction}, +}; + +/// Type alias for a validated bundle. +pub type ValidatedBundle = Bundle; + +/// An internal, minimal bundle type. +#[derive(Debug)] +pub struct Bundle { + pub id: Uuid, + pub block_number: U64, + pub transactions: Vec, + pub reverting_tx_hashes: Option>, + + // Cached bundle hash that's initialized on first use. + bundle_hash: OnceLock, +} + +impl From for Bundle { + fn from(bundle: EthSendBundle) -> Self { + let reverting_tx_hashes = + if bundle.reverting_tx_hashes.is_empty() { None } else { Some(bundle.reverting_tx_hashes) }; + + Self { + id: Uuid::new_v4(), + block_number: U64::from(bundle.block_number), + transactions: bundle.txs, + reverting_tx_hashes, + bundle_hash: OnceLock::new(), + } + } +} + +impl Bundle { + pub fn id(&self) -> Uuid { + self.id + } +} + +impl Hash for Bundle { + fn hash(&self, state: &mut H) { + self.block_number.hash(state); + self.transactions.hash(state); + // FIXME: This is actually not fully compatible with , + // because they use strings for the reverting tx hashes. + self.reverting_tx_hashes.hash(state); + } +} + +impl Bundle { + /// Calculates the bundle hash similarly to , + /// but using only the supported fields. + pub fn bundle_hash(&self) -> B256 { + *self.bundle_hash.get_or_init(|| { + let mut hasher = wyhash::WyHash::default(); + let mut bytes = [0u8; 32]; + for i in 0..4 { + self.hash(&mut hasher); + let hash = hasher.finish(); + bytes[(i * 8)..((i + 1) * 8)].copy_from_slice(&hash.to_be_bytes()); + } + + B256::from(bytes) + }) + } + + /// Tries to decode the RLP-encoded transactions into a bundle of [`OpTxEnvelope`]s. + pub fn try_decode(self) -> Result, BundleValidationError> { + // Ensure the bundle hash is initialized before converting. + let _ = self.bundle_hash(); + let transactions = self + .transactions + .into_iter() + .map(|tx| OpTxEnvelope::decode_2718(&mut tx.as_ref())) + .collect::, _>>()?; + + Ok(Bundle { + id: self.id, + block_number: self.block_number, + transactions, + reverting_tx_hashes: self.reverting_tx_hashes, + bundle_hash: self.bundle_hash, + }) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum BundleValidationError { + #[error("invalid transaction encoding: {0:?}")] + DecodeError(#[from] Eip2718Error), + #[error("invalid signature on transaction: {0:?}")] + InvalidSignature(TxHash), +} + +impl Bundle { + /// Returns the bundle hash of the bundle. + pub fn bundle_hash(&self) -> B256 { + // SAFETY: At this point, the bundle hash is guaranteed to be initialized. + *self.bundle_hash.get().expect("bundle hash is not initialized") + } + + /// Validates the bundle, including signature validation of included transactions. + /// + /// This is a CPU-intensive operation. + pub fn validate(self) -> Result { + let transactions = self + .transactions + .into_iter() + .map(|tx| { + let recovered = + tx.try_into_recovered().map_err(|tx| BundleValidationError::InvalidSignature(tx.tx_hash()))?; + let (tx, signer) = recovered.into_parts(); + let encoded = tx.encoded_2718(); + Ok::<_, BundleValidationError>(Transaction::new(tx, signer, encoded.into())) + }) + .collect::, _>>()?; + + Ok(Bundle { + id: self.id, + block_number: self.block_number, + transactions, + reverting_tx_hashes: self.reverting_tx_hashes, + bundle_hash: self.bundle_hash, + }) + } +} + +impl InMemorySize for ValidatedBundle { + fn size(&self) -> usize { + self.transactions.iter().map(|tx| tx.size()).sum() + } +} + +impl InMemorySize for Bundle { + fn size(&self) -> usize { + self.transactions.iter().map(|tx| tx.size()).sum() + } +} + +impl ValidatedBundle { + pub fn uuid(&self) -> Uuid { + self.id + } + + pub fn bundle_hash(&self) -> B256 { + // SAFETY: At this point, the bundle hash is guaranteed to be initialized. + *self.bundle_hash.get().expect("bundle hash is not initialized") + } + + /// Returns an iterator over the senders and nonces of the transactions in the bundle. + pub fn sender_nonces(&self) -> impl Iterator { + self.transactions.iter().map(|tx| (tx.sender(), tx.nonce())) + } +} + +/// A simulated bundle. +#[derive(Debug, Clone)] +pub struct SimulatedBundle { + /// The original validated bundle. + validated: Arc, + /// The simulated transactions in the bundle. + simulated: Vec, + /// The total payment of the bundle. + total_payment: Option, + /// The result and state of the bundle, after simulating all transactions. + result_and_state: Option, +} + +impl SimulatedBundle { + /// Creates a new unsimulated bundle from a validated bundle. + pub fn new(validated: Arc) -> Self { + Self { validated, simulated: Vec::new(), total_payment: None, result_and_state: None } + } + + /// Sets the simulation results for the bundle. + pub fn set_simulation_results( + &mut self, + simulated: Vec, + total_payment: U256, + result_and_state: ResultAndStateMany, + ) { + self.simulated = simulated; + self.total_payment = Some(total_payment); + self.result_and_state = Some(result_and_state); + } + + /// Returns the nonce of the first transaction for the given sender in the bundle (if present). + pub fn nonce_of(&self, sender: Address) -> Option { + self.validated.transactions.iter().find(|tx| tx.sender() == sender).map(|tx| tx.nonce()) + } + + /// Returns true if the bundle has a transaction for the given sender. + pub fn has_sender(&self, sender: Address) -> bool { + self.validated.transactions.iter().any(|tx| tx.sender() == sender) + } + + /// Returns true if the bundle has been simulated. + pub fn is_simulated(&self) -> bool { + self.result_and_state.is_some() + } + + /// Returns the inner validated bundle. + pub fn validated(&self) -> Arc { + self.validated.clone() + } + + /// Returns a reference to the inner validated bundle. + pub fn validated_ref(&self) -> &ValidatedBundle { + &self.validated + } + + pub fn transactions(&self) -> &[SimulatedTx] { + &self.simulated + } + + pub fn transactions_mut(&mut self) -> &mut [SimulatedTx] { + &mut self.simulated + } + + pub fn into_transactions(self) -> Vec { + self.simulated + } + + /// Returns the id of the bundle. + pub fn id(&self) -> Uuid { + self.validated.id() + } + + /// Returns the weight of the bundle. + pub fn weight(&self) -> U256 { + // If we have the simulated payment, use that. Else sum the priority fees of the transactions. + self.total_payment.unwrap_or_else(|| { + self.validated_ref().transactions.iter().map(|tx| U256::from(tx.priority_fee_or_price())).sum() + }) + } + + /// Returns the simulated payment of the bundle, if available. + pub fn payment(&self) -> Option { + self.total_payment + } + + /// Returns the gas limit of the bundle (i.e. sum of all transactions' gas limits) + pub fn gas_limit(&self) -> u64 { + self.validated.transactions.iter().map(|tx| tx.gas_limit()).sum() + } + + /// Returns the estimated DA size of the bundle (i.e. sum of all transactions' estimated DA sizes) + pub fn estimated_da(&self) -> u64 { + self.validated.transactions.iter().map(|tx| tx.estimated_tx_compressed_size()).sum() + } + + /// Returns the gas used by the bundle if it has been simulated. + pub fn gas_used(&self) -> u64 { + self.simulated.iter().map(|tx| tx.gas_used()).sum() + } + + /// Returns an iterator over the senders of the transactions in the bundle. + pub fn senders(&self) -> impl Iterator { + self.validated.transactions.iter().map(|tx| tx.sender()) + } + + /// Returns the result and state of the bundle, if it has been simulated. + pub fn result_and_state(&self) -> Option<&ResultAndStateMany> { + self.result_and_state.as_ref() + } + + /// Returns the simulation time of the bundle, if it has been simulated. + pub fn sim_time(&self) -> Option { + self.simulated.first().map(|tx| tx.sim_time) + } +} diff --git a/based/crates/common/src/order/mod.rs b/based/crates/common/src/order/mod.rs new file mode 100644 index 000000000..0a8e33adc --- /dev/null +++ b/based/crates/common/src/order/mod.rs @@ -0,0 +1,270 @@ +use std::sync::Arc; + +use alloy_primitives::{Address, U256}; +use op_revm::OpHaltReason; +use revm::{ + context::result::{ExecResultAndState, ExecutionResult, ResultVecAndState}, + state::EvmState, +}; +use uuid::Uuid; + +use crate::{ + telemetry::Telemetry, + time::Nanos, + transaction::{SimulatedTx, SimulatedTxList, Transaction, TxList}, +}; + +pub mod bundle; +pub use bundle::{SimulatedBundle, ValidatedBundle}; + +/// An order is either a transaction or an atomic bundle of transactions. +#[derive(Debug, Clone)] +pub enum Order { + Tx(Arc), + Bundle(Arc), +} + +impl From for Order { + fn from(tx: Transaction) -> Self { + Order::Tx(Arc::new(tx)) + } +} + +impl From> for Order { + fn from(tx: Arc) -> Self { + Order::Tx(tx) + } +} + +impl From> for Order { + fn from(bundle: Arc) -> Self { + Order::Bundle(bundle) + } +} + +impl From for Order { + fn from(bundle: ValidatedBundle) -> Self { + Order::Bundle(Arc::new(bundle)) + } +} + +impl Order { + pub fn tx(&self) -> Option<&Arc> { + match self { + Order::Tx(tx) => Some(tx), + _ => None, + } + } + + pub fn bundle(&self) -> Option<&Arc> { + match self { + Order::Bundle(bundle) => Some(bundle), + _ => None, + } + } + + pub fn uuid(&self) -> Uuid { + match self { + Order::Tx(tx) => tx.uuid, + Order::Bundle(bundle) => bundle.uuid(), + } + } + + /// Returns the pool telemetry update. + pub fn pool_telemetry(&self) -> Vec { + match self { + Order::Tx(tx) => vec![tx.to_added_to_pool_telemetry()], + Order::Bundle(bundle) => bundle.transactions.iter().map(|tx| tx.to_added_to_pool_telemetry()).collect(), + } + } + + pub fn ty(&self) -> &'static str { + match self { + Order::Tx(_) => "tx", + Order::Bundle(_) => "bundle", + } + } +} + +#[derive(Debug, Clone)] +pub enum SimulatedOrder { + Tx(SimulatedTx), + Bundle(SimulatedBundle), +} + +impl SimulatedOrder { + pub fn uuid(&self) -> Uuid { + match self { + SimulatedOrder::Tx(tx) => tx.uuid, + SimulatedOrder::Bundle(bundle) => bundle.validated_ref().uuid(), + } + } + + pub fn payment(&self) -> Option { + match self { + SimulatedOrder::Tx(tx) => Some(tx.payment), + SimulatedOrder::Bundle(bundle) => bundle.payment(), + } + } + + pub fn gas_used(&self) -> u64 { + match self { + SimulatedOrder::Tx(tx) => tx.gas_used(), + SimulatedOrder::Bundle(bundle) => bundle.gas_used(), + } + } + + /// Returns an iterator over the senders of the transactions in the order. + pub fn senders(&self) -> impl Iterator { + match self { + SimulatedOrder::Tx(tx) => either::Either::Left(std::iter::once(tx.sender())), + SimulatedOrder::Bundle(bundle) => either::Either::Right(bundle.senders()), + } + } + + /// Returns the result and state of the order, if available. + pub fn result_and_state<'a>(&'a self) -> Option> { + match self { + SimulatedOrder::Tx(tx) => Some(ResultAndState::Single(&tx.result_and_state)), + SimulatedOrder::Bundle(bundle) => bundle.result_and_state().map(ResultAndState::Many), + } + } + + pub fn sim_time(&self) -> Option { + match self { + SimulatedOrder::Tx(tx) => Some(tx.sim_time), + SimulatedOrder::Bundle(bundle) => bundle.sim_time(), + } + } + + /// Returns the estimated DA size of the order, if available. + pub fn estimated_da(&self) -> u64 { + match self { + SimulatedOrder::Tx(tx) => tx.tx.estimated_tx_compressed_size(), + SimulatedOrder::Bundle(bundle) => bundle.estimated_da(), + } + } + + pub fn included_telemetry(&self, frag: Uuid, id_in_frag: usize) -> Vec { + match self { + SimulatedOrder::Tx(tx) => vec![tx.to_included_telemetry(frag, id_in_frag, None)], + SimulatedOrder::Bundle(bundle) => bundle + .transactions() + .iter() + .enumerate() + // Make sure to increment the id_in_frag for each transaction in the bundle. + .map(|(i, tx)| tx.to_included_telemetry(frag, id_in_frag + i, Some(bundle.id()))) + .collect(), + } + } +} + +/// An order that is ready to be executed in the next block. +#[derive(Debug, Clone)] +pub enum PendingOrder { + Tx(SimulatedTxList), + Bundle(SimulatedBundle), +} + +impl From for PendingOrder { + fn from(order: Order) -> Self { + match order { + Order::Tx(tx) => PendingOrder::Tx(SimulatedTxList::new(None, &TxList::from(tx.clone()))), + Order::Bundle(bundle) => PendingOrder::Bundle(SimulatedBundle::new(bundle)), + } + } +} + +impl PendingOrder { + pub fn as_tx_list(&self) -> Option<&SimulatedTxList> { + match self { + PendingOrder::Tx(list) => Some(list), + _ => None, + } + } + + pub fn as_tx_list_mut(&mut self) -> Option<&mut SimulatedTxList> { + match self { + PendingOrder::Tx(list) => Some(list), + _ => None, + } + } + + pub fn as_bundle(&self) -> Option<&SimulatedBundle> { + match self { + PendingOrder::Bundle(bundle) => Some(bundle), + _ => None, + } + } + + /// Returns the weight of the order, in this case an estimated payment value. If the order has been simulated, + /// the payment will be accurate, otherwise it's an estimate based on the priority fee. + pub fn weight(&self) -> U256 { + match self { + PendingOrder::Tx(list) => list.weight(), + PendingOrder::Bundle(bundle) => bundle.weight(), + } + } + + /// Returns the simulated payment of the order, if available. + pub fn payment(&self) -> Option { + match self { + PendingOrder::Tx(list) => list.payment(), + PendingOrder::Bundle(bundle) => bundle.payment(), + } + } + + /// Returns the gas limit of the order, if available. + pub fn gas_limit(&self) -> Option { + match self { + PendingOrder::Tx(list) => list.gas_limit(), + PendingOrder::Bundle(bundle) => Some(bundle.gas_limit()), + } + } + + /// Returns the estimated DA size of the order, if available. + pub fn estimated_da(&self) -> Option { + match self { + PendingOrder::Tx(list) => list.estimated_da(), + PendingOrder::Bundle(bundle) => Some(bundle.estimated_da()), + } + } +} + +impl From for PendingOrder { + fn from(order: SimulatedOrder) -> Self { + match order { + SimulatedOrder::Tx(tx) => PendingOrder::Tx(SimulatedTxList::from(tx)), + SimulatedOrder::Bundle(bundle) => PendingOrder::Bundle(bundle), + } + } +} + +/// The result and state of a single transaction. +pub type ResultAndStateSingle = ExecResultAndState, EvmState>; + +/// The results and state of many transactions. +pub type ResultAndStateMany = ResultVecAndState, EvmState>; + +/// The result and state of a single or many transactions. +#[derive(Debug)] +pub enum ResultAndState<'a> { + Single(&'a ResultAndStateSingle), + Many(&'a ResultAndStateMany), +} + +impl<'a> ResultAndState<'a> { + pub fn state(&self) -> &EvmState { + match self { + ResultAndState::Single(single) => &single.state, + ResultAndState::Many(many) => &many.state, + } + } + + pub fn gas_used(&self) -> u64 { + match self { + ResultAndState::Single(single) => single.result.gas_used(), + ResultAndState::Many(many) => many.result.iter().map(|result| result.gas_used()).sum(), + } + } +} diff --git a/based/crates/common/src/p2p.rs b/based/crates/common/src/p2p.rs index aed897d32..59d8ac74a 100644 --- a/based/crates/common/src/p2p.rs +++ b/based/crates/common/src/p2p.rs @@ -226,6 +226,7 @@ mod tests { } #[test] + #[ignore] fn test_frag_v0() { let tx = Transaction::from(vec![1, 2, 3]); let txs = Transactions::from(vec![tx]); diff --git a/based/crates/common/src/telemetry.rs b/based/crates/common/src/telemetry.rs index ba5ef3a0a..e7d4c1ab2 100644 --- a/based/crates/common/src/telemetry.rs +++ b/based/crates/common/src/telemetry.rs @@ -19,15 +19,16 @@ pub fn telemetry_queue() -> Queue { .expect("Can't create or open telemetry queue") } -#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[repr(u8)] pub enum Telemetry { Tx(order::Tx), + Bundle(order::Bundle), Frag(frag::Frag), System(system::SystemNotification), } -#[derive(Copy, Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct TelemetryUpdate { pub identifier: Uuid, pub t: Nanos, @@ -40,6 +41,13 @@ impl TelemetryUpdate { producer.produce(&msg); } + pub fn send_batch(identifier: Uuid, updates: Vec, producer: &mut Producer) { + for update in updates { + let msg = Self { identifier, t: Nanos::now(), update }; + producer.produce(&msg); + } + } + pub fn send_ref(identifier: Uuid, update: Telemetry, producer: &Producer) { let msg = Self { identifier, t: Nanos::now(), update }; producer.produce_without_first(&msg); diff --git a/based/crates/common/src/telemetry/order.rs b/based/crates/common/src/telemetry/order.rs index 0736a2cb7..51c6fac28 100644 --- a/based/crates/common/src/telemetry/order.rs +++ b/based/crates/common/src/telemetry/order.rs @@ -7,12 +7,13 @@ use uuid::Uuid; use crate::{eth::MicroEth, time::Nanos}; #[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq, Default, Serialize, Deserialize)] -pub struct IncludedInFrag { +pub struct TransactionInclusion { pub frag: Uuid, pub id_in_frag: u16, pub payment: MicroEth, pub sim_time: Nanos, pub gas_used: u64, + pub bundle_id: Option, } #[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq, Default, Serialize, Deserialize)] @@ -27,7 +28,7 @@ pub struct Ingested { pub enum Tx { Ingested(Ingested), AddedToPool, - Included(IncludedInFrag), + Included(TransactionInclusion), RemovedFromPool, } impl Tx { @@ -38,3 +39,36 @@ impl Tx { } } } + +#[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Default, Serialize, Deserialize)] +pub struct BundleInclusion { + pub frag: Uuid, + pub txs: Vec<(Uuid, TransactionInclusion)>, +} + +impl BundleInclusion { + pub fn payment(&self) -> MicroEth { + let mut payment = MicroEth::default(); + for tx in self.txs.iter() { + payment += tx.1.payment; + } + + payment + } + + pub fn gas_used(&self) -> u64 { + self.txs.iter().map(|tx| tx.1.gas_used).sum() + } + + pub fn sim_time(&self) -> Nanos { + self.txs.iter().map(|tx| tx.1.sim_time).sum() + } +} +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, AsRefStr)] +#[repr(u8)] +pub enum Bundle { + Ingested { uuid: Uuid, signer: Option
}, + AddedToPool, + Included(BundleInclusion), + RemovedFromPool, +} diff --git a/based/crates/common/src/time/utils.rs b/based/crates/common/src/time/utils.rs index 63a62d246..b4a3663bb 100644 --- a/based/crates/common/src/time/utils.rs +++ b/based/crates/common/src/time/utils.rs @@ -1,4 +1,7 @@ -use std::sync::atomic::{AtomicBool, Ordering}; +use std::{ + sync::atomic::{AtomicBool, Ordering}, + time::{SystemTime, UNIX_EPOCH}, +}; use crate::time::{Duration, Instant}; @@ -89,3 +92,8 @@ pub fn timeit(msg: &str, f: impl FnOnce() -> O) -> O { println!("Timing result: {msg} took {}", curt.elapsed()); o } + +#[inline] +pub fn unix_millis() -> u128 { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() +} diff --git a/based/crates/common/src/transaction/simulated/transaction.rs b/based/crates/common/src/transaction/simulated/transaction.rs index cf4a92c10..a48b8a827 100644 --- a/based/crates/common/src/transaction/simulated/transaction.rs +++ b/based/crates/common/src/transaction/simulated/transaction.rs @@ -14,7 +14,7 @@ use revm_primitives::{B256, Bytes, TxKind}; use uuid::Uuid; use crate::{ - telemetry::{Telemetry, Tx, order::IncludedInFrag}, + telemetry::{Telemetry, Tx, order::TransactionInclusion}, time::Nanos, transaction::Transaction, }; @@ -150,13 +150,18 @@ impl SimulatedTx { self.result_and_state.result.gas_used() } - pub fn to_included_telemetry(&self, frag: Uuid, id_in_frag: usize) -> Telemetry { - Telemetry::Tx(Tx::Included(IncludedInFrag { + pub fn estimated_da(&self) -> u64 { + self.tx.estimated_tx_compressed_size() + } + + pub fn to_included_telemetry(&self, frag: Uuid, id_in_frag: usize, bundle_id: Option) -> Telemetry { + Telemetry::Tx(Tx::Included(TransactionInclusion { frag, id_in_frag: id_in_frag as u16, payment: self.payment.into(), sim_time: self.sim_time, gas_used: self.gas_used(), + bundle_id, })) } } diff --git a/based/crates/common/src/transaction/simulated/tx_list.rs b/based/crates/common/src/transaction/simulated/tx_list.rs index cf10b0e67..936731d74 100644 --- a/based/crates/common/src/transaction/simulated/tx_list.rs +++ b/based/crates/common/src/transaction/simulated/tx_list.rs @@ -119,8 +119,9 @@ impl SimulatedTxList { U256::ZERO } - pub fn payment(&self) -> alloy_primitives::Uint<256, 4> { - self.current.as_ref().map(|c| c.payment).unwrap_or_default() + /// Returns the simulated payment of the current transaction, if available. + pub fn payment(&self) -> Option { + self.current.as_ref().map(|c| c.payment) } pub fn gas_limit(&self) -> Option { diff --git a/based/crates/metrics/src/consumer.rs b/based/crates/metrics/src/consumer.rs index 3a453565c..ce9c6f861 100644 --- a/based/crates/metrics/src/consumer.rs +++ b/based/crates/metrics/src/consumer.rs @@ -6,7 +6,7 @@ use std::sync::{ use bop_common::{ communication::Consumer, metrics::{Gauge, Metric, MetricsUpdate, metrics_queue}, - telemetry::{Frag, Telemetry, TelemetryUpdate, Tx, system::SystemNotification, telemetry_queue}, + telemetry::{Frag, Telemetry, TelemetryUpdate, Tx, order::Bundle, system::SystemNotification, telemetry_queue}, time::{Duration, Instant, Repeater}, }; use metrics::{counter, gauge, histogram}; @@ -77,6 +77,14 @@ impl MetricsConsumer { Tx::Included(_) => counter!("bop_gateway_tx_included_total").increment(1), Tx::Ingested(_) => counter!("bop_gateway_tx_ingested_total").increment(1), }, + Telemetry::Bundle(bundle) => match bundle { + Bundle::AddedToPool => counter!("bop_gateway_bundle_added_to_pool_total").increment(1), + Bundle::RemovedFromPool => counter!("bop_gateway_bundle_removed_from_pool_total").increment(1), + Bundle::Included(_) => counter!("bop_gateway_bundle_included_total").increment(1), + Bundle::Ingested { .. } => { + counter!("bop_gateway_bundle_ingested_total").increment(1); + } + }, Telemetry::System(system) => match system { SystemNotification::StateChanged(state) => gauge!("bop_sequencer_state").set(state as u8), SystemNotification::BlockSync(block_num, _) => { diff --git a/based/crates/pool/Cargo.toml b/based/crates/pool/Cargo.toml index 9762c5bd3..00d01e3bc 100644 --- a/based/crates/pool/Cargo.toml +++ b/based/crates/pool/Cargo.toml @@ -8,6 +8,7 @@ version.workspace = true alloy-consensus.workspace = true alloy-primitives.workspace = true bop-common.workspace = true +indexmap.workspace = true op-revm.workspace = true reth-optimism-primitives.workspace = true reth-primitives-traits.workspace = true diff --git a/based/crates/pool/src/transaction/active.rs b/based/crates/pool/src/transaction/active.rs deleted file mode 100644 index f1e058c2c..000000000 --- a/based/crates/pool/src/transaction/active.rs +++ /dev/null @@ -1,106 +0,0 @@ -use std::sync::Arc; - -use alloy_consensus::Transaction as TxTrait; -use alloy_primitives::Address; -use bop_common::transaction::{SimulatedTxList, Transaction}; -use rustc_hash::FxHashMap; - -#[derive(Debug, Clone, Default)] -pub struct Active { - pub txs: Vec, - /// These are the senders that we have txs for in the active list. - /// Maps sender to index in `txs`. - senders: FxHashMap, -} - -impl Active { - pub fn with_capacity(capacity: usize) -> Self { - Self { - txs: Vec::with_capacity(capacity), - senders: FxHashMap::with_capacity_and_hasher(capacity, Default::default()), - } - } - - #[inline] - pub fn put(&mut self, tx: SimulatedTxList) { - let sender = tx.sender(); - - if let Some(&index) = self.senders.get(&sender) { - self.txs[index] = tx; - } else { - self.txs.push(tx); - self.senders.insert(sender, self.txs.len() - 1); - } - } - - #[inline] - pub fn clear(&mut self) { - self.senders.clear(); - self.txs.clear(); - } - - #[inline] - pub fn clone_txs(&self) -> Vec { - self.txs.clone() - } - - #[inline] - pub fn txs(&self) -> &[SimulatedTxList] { - &self.txs - } - - #[inline] - pub fn is_empty(&self) -> bool { - self.txs.is_empty() - } - - #[inline] - pub fn len(&self) -> usize { - self.txs.len() - } - - /// Returns the total number of individual transactions in the active list. - #[inline] - pub fn num_txs(&self) -> usize { - self.txs.iter().map(|tx| tx.len()).sum() - } - - #[inline] - pub fn tx_list_mut(&mut self, sender: &Address) -> Option<&mut SimulatedTxList> { - self.senders.get_mut(sender).map(|index| &mut self.txs[*index]) - } - - #[inline] - pub fn forward(&mut self, address: &Address, nonce: u64, f: &mut impl FnMut(Arc)) { - let Some(&index) = self.senders.get(address) else { - return; - }; - - let tx_list = &mut self.txs[index]; - if tx_list.pending.forward(nonce, f) { - self.remove(index, address); - return; - } - - if let Some(ref current) = tx_list.current { - if nonce >= current.nonce() { - tx_list.current = None; - } - } - } - - #[inline] - fn remove(&mut self, index: usize, address: &Address) { - // Remove the sender from the active list. - self.txs.swap_remove(index); - self.senders.remove(address); - - if index == self.txs.len() { - return; - } - - // If we swapped with a tx (wasn't the last element), update its sender's index. - let swapped_sender = self.txs[index].sender(); - *self.senders.get_mut(&swapped_sender).unwrap() = index; - } -} diff --git a/based/crates/pool/src/transaction/mod.rs b/based/crates/pool/src/transaction/mod.rs index d2602fbd7..bd17dc978 100644 --- a/based/crates/pool/src/transaction/mod.rs +++ b/based/crates/pool/src/transaction/mod.rs @@ -1,2 +1,2 @@ -pub mod active; +pub mod pending; pub mod pool; diff --git a/based/crates/pool/src/transaction/pending.rs b/based/crates/pool/src/transaction/pending.rs new file mode 100644 index 000000000..636944ae9 --- /dev/null +++ b/based/crates/pool/src/transaction/pending.rs @@ -0,0 +1,179 @@ +use std::sync::Arc; + +use alloy_consensus::Transaction as TxTrait; +use alloy_primitives::Address; +use bop_common::{ + order::{PendingOrder, bundle::SimulatedBundle}, + transaction::{SimulatedTxList, Transaction}, +}; +use indexmap::IndexMap; +use rustc_hash::FxHashMap; +use uuid::Uuid; + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +enum OrderKey { + Tx(Address), + Bundle(Uuid), +} + +impl From
for OrderKey { + fn from(sender: Address) -> Self { + OrderKey::Tx(sender) + } +} + +impl From for OrderKey { + fn from(id: Uuid) -> Self { + OrderKey::Bundle(id) + } +} + +/// State for a sender present in the pending orders. +#[derive(Debug, Clone)] +struct SenderState { + /// The current nonce for this sender. + nonce: u64, + /// The entries for this sender in the main order map. + entries: Vec, +} + +/// Pending orders that are ready to be executed by the sequencer. All nonces are correct, i.e. there are no gaps or +/// duplicate (sender, nonce) pairs. +#[derive(Debug, Clone, Default)] +pub struct PendingOrders { + /// All senders with their state and link to the main order map. + senders: FxHashMap, + /// All orders by insertion order. + orders: IndexMap, +} + +impl PendingOrders { + pub fn with_capacity(capacity: usize) -> Self { + Self { + senders: FxHashMap::with_capacity_and_hasher(capacity, Default::default()), + orders: IndexMap::with_capacity_and_hasher(capacity, Default::default()), + } + } + + /// Adds a tx list to the pending orders, overriding any existing tx list with the same sender. + pub fn put_tx_list(&mut self, list: SimulatedTxList) { + let sender = list.sender(); + let key = OrderKey::from(sender); + + let entry = + self.senders.entry(sender).or_insert_with(|| SenderState { nonce: list.nonce(), entries: Vec::new() }); + + entry.entries.push(key.clone()); + entry.nonce = list.nonce(); + + self.orders.insert(key, PendingOrder::Tx(list)); + } + + /// Adds a bundle to the pending orders, overriding any existing bundle with the same id. + pub fn put_bundle(&mut self, bundle: SimulatedBundle) { + let key = OrderKey::from(bundle.validated_ref().id()); + for tx in bundle.validated_ref().transactions.iter() { + let entry = self + .senders + .entry(tx.sender()) + .or_insert_with(|| SenderState { nonce: tx.nonce(), entries: Vec::new() }); + + entry.entries.push(key.clone()); + + if tx.nonce() > entry.nonce { + entry.nonce = tx.nonce(); + } + } + + self.orders.insert(key, PendingOrder::Bundle(bundle)); + } + + #[inline] + pub fn clear(&mut self) { + self.senders.clear(); + self.orders.clear(); + } + + #[inline] + pub fn next_nonce(&self, sender: Address) -> Option { + self.senders.get(&sender).map(|state| state.nonce) + } + + #[inline] + pub fn tx_list(&self, sender: &Address) -> Option<&SimulatedTxList> { + self.orders.get(&OrderKey::from(*sender)).and_then(|order| order.as_tx_list()) + } + + #[inline] + pub fn tx_list_mut(&mut self, sender: &Address) -> Option<&mut SimulatedTxList> { + self.orders.get_mut(&OrderKey::from(*sender)).and_then(|order| order.as_tx_list_mut()) + } + + /// Removes all transactions with nonce lower or equal than the provided threshold. + #[inline] + pub fn forward(&mut self, sender: &Address, nonce: u64, f: &mut impl FnMut(Arc)) { + let Some(state) = self.senders.get_mut(sender) else { + return; + }; + + // Only proceed if we have a nonce that is <= this nonce. + if state.nonce > nonce { + return; + } + + let mut to_remove = Vec::new(); + for (index, entry) in state.entries.iter().enumerate() { + let Some(order) = self.orders.get_mut(entry) else { + continue; + }; + + match order { + PendingOrder::Tx(list) => { + if list.pending.forward(nonce, f) { + self.orders.shift_remove(entry); + to_remove.push(index); + + continue; + } + + if let Some(ref current) = list.current { + if nonce >= current.nonce() { + list.current = None; + } + } + } + + PendingOrder::Bundle(bundle) => { + if bundle.validated_ref().transactions.iter().any(|tx| tx.nonce() <= nonce) { + self.orders.shift_remove(entry); + to_remove.push(index); + } + } + } + } + + // Remove the stale entries from the sender state. + // NOTE: For bundles, we should technically also remove the pointers from other senders for any invalidated + // bundles. We omit this for now because the state is cleared every frag. + for index in to_remove.iter().rev() { + state.entries.swap_remove(*index); + } + + // Set the nonce to the next nonce for this sender, since everything below it has been removed. + state.nonce = nonce.saturating_add(1); + + if state.entries.is_empty() { + self.senders.remove(sender); + } + } + + /// Returns a snapshot of the pending orders (clones internally) in insertion order. + pub fn snapshot(&self) -> impl Iterator + '_ { + self.orders.values().cloned() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.orders.is_empty() + } +} diff --git a/based/crates/pool/src/transaction/pool.rs b/based/crates/pool/src/transaction/pool.rs index 094f15291..7188c5a6a 100644 --- a/based/crates/pool/src/transaction/pool.rs +++ b/based/crates/pool/src/transaction/pool.rs @@ -5,28 +5,123 @@ use alloy_primitives::Address; use bop_common::{ communication::{Producer, SendersSpine, TrackedSenders, messages::SequencerToSimulator}, db::{DBFrag, DatabaseRead}, + order::{ + Order, PendingOrder, SimulatedOrder, + bundle::{SimulatedBundle, ValidatedBundle}, + }, telemetry::TelemetryUpdate, time::Duration, - transaction::{SimulatedTx, SimulatedTxList, Transaction, TxList}, + transaction::{SimulatedTxList, Transaction, TxList}, }; use reth_optimism_primitives::transaction::OpTransaction; use reth_primitives_traits::InMemorySize; +use rustc_hash::FxHashMap; +use uuid::Uuid; -use crate::transaction::active::Active; +use crate::transaction::pending::PendingOrders; #[derive(Clone, Debug, Default)] pub struct TxPool { /// maps an eoa to all pending txs pool_data: HashMap, + /// Persistent storage for bundles (bundles aren't stored in pool_data like regular txs) + bundle_data: HashMap>, /// Current list of all simulated mineable txs in the pool - pub active_txs: Active, + pub pending_orders: PendingOrders, /// Current memory size of the pool in bytes pub mem_size: usize, } impl TxPool { pub fn new(capacity: usize) -> Self { - Self { pool_data: HashMap::with_capacity(capacity), active_txs: Active::with_capacity(capacity), mem_size: 0 } + Self { + pool_data: HashMap::with_capacity(capacity), + bundle_data: HashMap::with_capacity(capacity), + pending_orders: PendingOrders::with_capacity(capacity), + mem_size: 0, + } + } + + /// Handles a new [`Order`] from the sequencer. + pub fn handle_order( + &mut self, + order: Order, + db: &DBFrag, + base_fee: u64, + syncing: bool, + sim_sender: Option<&SendersSpine>, + ) -> bool { + // We can't just call `handle_new_tx` here because we need to handle bundles differently. They must always + // remain atomic, which means that we can not add individual txs to the pending list. + match order { + Order::Tx(tx) => self.handle_tx(tx, db, base_fee, syncing, sim_sender), + Order::Bundle(bundle) => self.handle_bundle(bundle, db, base_fee, syncing, sim_sender), + } + } + + #[tracing::instrument(skip_all)] + fn handle_bundle( + &mut self, + bundle: Arc, + db: &DBFrag, + base_fee: u64, + syncing: bool, + sim_sender: Option<&SendersSpine>, + ) -> bool { + if syncing { + // Short-circuit here + return false; + } + + let mut nonce_diffs = FxHashMap::default(); + + // Simple transaction validation closure + let validate_tx = |tx: &Transaction, diffs: &mut FxHashMap| { + let sender = tx.sender(); + let nonce = tx.nonce(); + + // Check if there's already a pending nonce for this sender. + // If so, we need to continue from where the pending orders left off, + // not from the DB nonce (which may not reflect pending bundles yet). + let pending_nonce = self.pending_orders.next_nonce(sender); + let mut expected_nonce = if let Some(pending) = pending_nonce { + // There are pending orders for this sender, new bundle should continue from there + pending + 1 + } else { + db.get_nonce(sender).expect("failed to get nonce") + }; + + // Add the nonce diff from txs already validated in this bundle + if let Some(diff) = diffs.get(&sender) { + expected_nonce += diff; + } + + // Only accept transactions with the correct nonce + if nonce != expected_nonce || !tx.valid_for_block(base_fee) { + tracing::debug!(%sender, hash = %tx.hash(), nonce, expected_nonce, ?pending_nonce, base_fee, "Transaction validation failed"); + return false; + } + + diffs.entry(sender).and_modify(|diff| *diff += 1).or_insert(1); + + true + }; + + // Validate all transactions in the bundle + if !bundle.transactions.iter().all(|tx| validate_tx(tx, &mut nonce_diffs)) { + return false; + } + + self.mem_size = self.mem_size.saturating_add(bundle.size()); + self.pending_orders.put_bundle(SimulatedBundle::new(bundle.clone())); + + // Store bundle persistently so it survives handle_new_frag clearing pending_orders + self.bundle_data.insert(bundle.id(), bundle.clone()); + + // Request top-of-frag simulation for the bundle. + TxPool::send_sim_requests_for_bundle(&bundle, db, sim_sender); + + true } /// Handles an incoming transaction. @@ -35,7 +130,7 @@ impl TxPool { /// If syncing is false we will fill the active list. /// If sim_sender is Some, and we are not syncing, we will also send simulation requests for the /// first tx for each sender to the simulator. - pub fn handle_new_tx( + fn handle_tx( &mut self, new_tx: Arc, db: &DBFrag, @@ -65,17 +160,24 @@ impl TxPool { tx_list.put(new_tx.clone()); self.mem_size = self.mem_size.saturating_add(new_tx.size()); + // Check the next nonce for this sender in the pending orders. + // TODO(mempirate): This should compare effective gas prices, and swap if more profitable. + let pending_nonce = self.pending_orders.next_nonce(new_tx.sender()); + if pending_nonce.is_some_and(|pending| pending == nonce) { + return false; + } + if !syncing { let valid_for_block = new_tx.valid_for_block(base_fee); if is_next_nonce && valid_for_block { // If this is the first tx for a sender, and it can be processed, simulate it and add to active. TxPool::send_sim_requests_for_tx(&new_tx, db, sim_sender); - self.active_txs.put(SimulatedTxList::new(None, tx_list)); + self.pending_orders.put_tx_list(SimulatedTxList::new(None, tx_list)); self.mem_size = self.mem_size.saturating_add(tx_list.mem_size()); } else if valid_for_block { // If we already have the first tx for this sender and it's in active we might be able to // add this tx to its pending list. - if let Some(simulated_tx_list) = self.active_txs.tx_list_mut(new_tx.sender_ref()) { + if let Some(simulated_tx_list) = self.pending_orders.tx_list_mut(new_tx.sender_ref()) { if tx_list.nonce_ready(state_nonce, base_fee, nonce) { simulated_tx_list.new_pending(tx_list.ready(state_nonce, base_fee).unwrap()); } @@ -87,10 +189,17 @@ impl TxPool { let tx_list = TxList::from(new_tx.clone()); if !syncing { + // Check the next nonce for this sender in the pending orders. + // TODO(mempirate): This should compare effective gas prices, and swap if more profitable. + let pending_nonce = self.pending_orders.next_nonce(new_tx.sender()); + if pending_nonce.is_some_and(|pending| pending == nonce) { + return false; + } + // If this is the first tx for a sender, and it can be processed, simulate it and add to active. if is_next_nonce && new_tx.valid_for_block(base_fee) { TxPool::send_sim_requests_for_tx(&new_tx, db, sim_sender); - self.active_txs.put(SimulatedTxList::new(None, &tx_list)); + self.pending_orders.put_tx_list(SimulatedTxList::new(None, &tx_list)); self.mem_size = self.mem_size.saturating_add(tx_list.mem_size()); } } @@ -103,19 +212,28 @@ impl TxPool { true } - /// Validates simualted tx. If valid, fetch its TxList and save the new [SimulatedTxList] to `active_txs`. - pub fn handle_simulated(&mut self, simulated_tx: SimulatedTx) { - let Some(tx_list) = self.pool_data.get(simulated_tx.sender_ref()) else { - tracing::warn!(sender = ?simulated_tx.sender(), "Couldn't find tx list for valid simulated tx"); - return; - }; + /// Validates a simulated order. If valid, adds it to the pending orders. + pub fn handle_simulated(&mut self, order: SimulatedOrder) { + match order { + SimulatedOrder::Tx(simulated_tx) => { + let Some(tx_list) = self.pool_data.get(simulated_tx.sender_ref()) else { + tracing::warn!(sender = ?simulated_tx.sender(), "Couldn't find tx list for valid simulated tx"); + return; + }; - // Refresh active txs with the latest tx_list and simulated tx. - // TODO: probably unecassary to copy the tx_list here. - let simulated_tx_list = SimulatedTxList::new(Some(simulated_tx), tx_list); - let mem_size = simulated_tx_list.mem_size(); - self.active_txs.put(simulated_tx_list); - self.mem_size = self.mem_size.saturating_add(mem_size); + // Refresh active txs with the latest tx_list and simulated tx. + // TODO: probably unecassary to copy the tx_list here. + let simulated_tx_list = SimulatedTxList::new(Some(simulated_tx), tx_list); + let mem_size = simulated_tx_list.mem_size(); + self.pending_orders.put_tx_list(simulated_tx_list); + self.mem_size = self.mem_size.saturating_add(mem_size); + } + + SimulatedOrder::Bundle(simulated_bundle) => { + tracing::debug!(id = %simulated_bundle.id(), "Received simulated bundle"); + self.pending_orders.put_bundle(simulated_bundle); + } + } } /// Removes a transaction with sender and nonce from the pool. @@ -135,7 +253,7 @@ impl TxPool { } } - self.active_txs.forward(sender, nonce, &mut f); + self.pending_orders.forward(sender, nonce, &mut f); } pub fn remove_mined_txs<'a, T: OpTransaction + TransactionTrait + 'a>( @@ -160,7 +278,7 @@ impl TxPool { self.pool_data.remove(sender); } } - self.active_txs.forward(sender, nonce, &mut f); + self.pending_orders.forward(sender, nonce, &mut f); } } @@ -178,15 +296,42 @@ impl TxPool { // If enabled, fill the active list with non-simulated txs and send off the first tx for each sender to // simulator. if !syncing { - self.active_txs.clear(); + self.pending_orders.clear(); for (sender, tx_list) in self.pool_data.iter() { let db_nonce = db.get_nonce(*sender).unwrap(); if let Some(ready) = tx_list.ready(db_nonce, base_fee) { TxPool::send_sim_requests_for_tx(ready.peek().unwrap(), db, sim_sender); - self.active_txs.put(SimulatedTxList::new(None, tx_list)); + self.pending_orders.put_tx_list(SimulatedTxList::new(None, tx_list)); self.mem_size = self.mem_size.saturating_add(tx_list.mem_size()); } } + + // Re-add bundles from bundle_data that are still valid (nonces not yet consumed). + // Remove bundles whose transactions have already been mined. + let mut to_remove = Vec::new(); + for (id, bundle) in self.bundle_data.iter() { + // Check if all transactions in the bundle still have valid nonces + let all_valid = bundle.transactions.iter().all(|tx| { + let db_nonce = db.get_nonce(tx.sender()).unwrap(); + tx.nonce() >= db_nonce && tx.valid_for_block(base_fee) + }); + + if all_valid { + // Re-add to pending_orders + self.pending_orders.put_bundle(SimulatedBundle::new(bundle.clone())); + TxPool::send_sim_requests_for_bundle(bundle, db, sim_sender); + } else { + // Bundle is no longer valid, mark for removal + to_remove.push(*id); + } + } + + // Remove invalid bundles from persistent storage + for id in to_remove { + if let Some(bundle) = self.bundle_data.remove(&id) { + self.mem_size = self.mem_size.saturating_sub(bundle.size()); + } + } } } @@ -205,30 +350,33 @@ impl TxPool { } } - #[inline] - pub fn clone_active(&self) -> Vec { - self.active_txs.clone_txs() - } - - #[inline] - pub fn active(&self) -> &[SimulatedTxList] { - self.active_txs.txs() - } - - #[inline] - pub fn num_active_txs(&self) -> usize { - self.active_txs.num_txs() + /// Sends a simulation request for a bundle to the simulator. + fn send_sim_requests_for_bundle( + bundle: &Arc, + db: &DBFrag, + sim_sender: Option<&SendersSpine>, + ) { + if let Some(sim_sender) = sim_sender { + if let Err(error) = sim_sender.send_timeout( + SequencerToSimulator::SimulateBundleTof(bundle.clone(), db.clone()), + Duration::from_millis(10), + ) { + tracing::warn!(?error, "couldn't send simulator message"); + debug_assert!(false, "Couldn't send simulator message"); + } + } } #[inline] - pub fn active_empty(&self) -> bool { - self.active_txs.is_empty() + pub fn snapshot(&self) -> impl Iterator + '_ { + self.pending_orders.snapshot() } #[inline] pub fn clear(&mut self) { - self.active_txs.clear(); + self.pending_orders.clear(); self.pool_data.clear(); + self.bundle_data.clear(); self.mem_size = 0; } } diff --git a/based/crates/rpc/Cargo.toml b/based/crates/rpc/Cargo.toml index c827ae345..5d6ee4df2 100644 --- a/based/crates/rpc/Cargo.toml +++ b/based/crates/rpc/Cargo.toml @@ -5,6 +5,7 @@ rust-version.workspace = true version.workspace = true [dependencies] +alloy-consensus.workspace = true alloy-eips.workspace = true alloy-primitives.workspace = true alloy-rpc-types.workspace = true @@ -17,6 +18,7 @@ op-alloy-rpc-types-engine.workspace = true reqwest.workspace = true reth-optimism-payload-builder.workspace = true reth-optimism-primitives.workspace = true +reth-optimism-txpool.workspace = true reth-rpc-layer.workspace = true serde_json.workspace = true tokio.workspace = true diff --git a/based/crates/rpc/src/fabric.rs b/based/crates/rpc/src/fabric.rs index b9637fbea..ba00fae10 100644 --- a/based/crates/rpc/src/fabric.rs +++ b/based/crates/rpc/src/fabric.rs @@ -1,10 +1,11 @@ -use std::{sync::Arc, time::Duration}; +use std::time::Duration; use alloy_eips::Decodable2718; use alloy_primitives::B256; use bop_common::{ communication::messages::{RpcError, RpcResult}, fabric::{Commitment, CommitmentRequest, FabricGatewayApiServer, FeeInfo, SignedCommitment, SlotInfoResponse}, + order::Order, p2p::VersionedMessage, telemetry::TelemetryUpdate, transaction::Transaction, @@ -24,14 +25,15 @@ impl FabricGatewayApiServer for RpcServer { #[tracing::instrument(skip_all, err, ret(level = Level::TRACE))] async fn post_commitment(&self, commitment: CommitmentRequest) -> RpcResult { let request_hash = commitment.tree_hash_root(); - let tx = Arc::new(Transaction::decode(commitment.payload.to_vec().into())?); + let tx = Transaction::decode(commitment.payload.to_vec().into())?; let tx_hash = tx.tx_hash(); let mut receiver = self.frag_receiver_spawner.subscribe(); // Send the transaction to the sequencer TelemetryUpdate::send_ref(tx.uuid, tx.to_ingested_telemetry(), &self.telemetry_producer); - let _ = self.new_order_tx.send(tx.into()); + let order = Order::from(tx); + let _ = self.new_order_tx.send(order.into()); // Wait for the transaction to be committed let commitment_future = async { diff --git a/based/crates/rpc/src/lib.rs b/based/crates/rpc/src/lib.rs index f00b2d61d..2c73aa894 100644 --- a/based/crates/rpc/src/lib.rs +++ b/based/crates/rpc/src/lib.rs @@ -1,10 +1,13 @@ -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, str::FromStr as _}; use alloy_primitives::{B256, Bytes, U64}; -use alloy_rpc_types::engine::JwtSecret; +use alloy_rpc_types::{ + engine::JwtSecret, + mev::{EthBundleHash, EthSendBundle}, +}; use axum::{Router, routing::get}; use bop_common::{ - api::{ControlApiServer, EngineApiServer, MinimalEthApiServer, OpMinerExtApiServer}, + api::{ControlApiServer, EngineApiServer, MinimalEthApiServer, MinimalMevApiServer, OpMinerExtApiServer}, communication::{ Producer, Sender, Spine, messages::{EngineApi, RpcResult}, @@ -12,26 +15,32 @@ use bop_common::{ config::GatewayArgs, db::DatabaseRead, fabric::FabricGatewayApiServer, + order::{Order, bundle::Bundle}, p2p::SignedVersionedMessage, telemetry::{TelemetryUpdate, telemetry_queue}, - time::Duration, + time::{self, Duration}, transaction::Transaction, }; use jsonrpsee::{ core::async_trait, server::{ServerBuilder, ServerConfigBuilder}, }; +use op_alloy_consensus::interop; use reth_optimism_payload_builder::config::OpDAConfig; use reth_rpc_layer::{AuthLayer, JwtAuthValidator}; use tokio::{net::TcpListener, runtime::Runtime}; use tracing::{Level, error, info, trace}; -use crate::state_stream::{StreamState, state_stream}; +use crate::{ + state_stream::{StreamState, state_stream}, + supervisor::{SuperVisorConfig, SupervisorValidator}, +}; mod engine; mod fabric; pub mod gossiper; mod state_stream; +mod supervisor; const STATE_STREAM_PATH: &str = "/state_stream"; @@ -45,7 +54,20 @@ pub fn start_rpc( let addr_auth = SocketAddr::new(config.rpc_host.into(), config.rpc_port); let addr_no_auth = SocketAddr::new(config.rpc_host.into(), config.rpc_port_no_auth); let addr_ws = SocketAddr::new(config.rpc_host.into(), config.rpc_port_ws); - let server = RpcServer::new(spine, config.sequencer_jwt(), rx_spawner, da_config); + let mut server = RpcServer::new(spine, config.sequencer_jwt(), rx_spawner, da_config); + + // Configure supervisor + if let Some(supervisor) = config.supervisor_url.as_ref() { + let safety_level = interop::SafetyLevel::from_str( + config.supervisor_safety_level.as_ref().expect("supervisor safety level is required"), + ) + .unwrap(); + let config = SuperVisorConfig { url: supervisor.clone(), safety_level }; + + let supervisor = rt.block_on(SupervisorValidator::new(&config)); + server = server.with_supervisor(supervisor); + } + rt.spawn(server.run(addr_auth, addr_no_auth, addr_ws)); } @@ -53,13 +75,15 @@ pub fn start_rpc( // TODO: timing #[derive(Debug, Clone)] struct RpcServer { - new_order_tx: Sender>, + new_order_tx: Sender, engine_timeout: Duration, engine_rpc_tx: Sender, jwt: JwtSecret, telemetry_producer: Producer, frag_receiver_spawner: tokio::sync::broadcast::Sender, da_config: OpDAConfig, + /// The cross-chain transaction validator. + supervisor: Option, } impl RpcServer { @@ -77,9 +101,16 @@ impl RpcServer { telemetry_producer: telemetry_queue().into(), frag_receiver_spawner, da_config, + supervisor: None, } } + /// Set the supervisor validator for RPC transaction validation (only for cross-chain transactions) + pub fn with_supervisor(mut self, supervisor: SupervisorValidator) -> Self { + self.supervisor = Some(supervisor); + self + } + #[tracing::instrument(skip_all, name = "rpc")] pub async fn run(self, addr_auth: SocketAddr, addr_no_auth: SocketAddr, addr_ws: SocketAddr) { info!(%addr_auth, "starting RPC server"); @@ -96,6 +127,7 @@ impl RpcServer { .await .expect("failed to create eth RPC server"); let mut module = MinimalEthApiServer::into_rpc(self.clone()); + module.merge(MinimalMevApiServer::into_rpc(self.clone())).expect("failed to merge modules"); module.merge(EngineApiServer::into_rpc(self.clone())).expect("failed to merge modules"); module.merge(ControlApiServer::into_rpc(self.clone())).expect("failed to merge modules"); module.merge(OpMinerExtApiServer::into_rpc(self.clone())).expect("failed to merge modules"); @@ -114,6 +146,7 @@ impl RpcServer { .expect("failed to create eth RPC server"); let mut module = FabricGatewayApiServer::into_rpc(self.clone()); module.merge(MinimalEthApiServer::into_rpc(self.clone())).expect("failed to merge modules"); + module.merge(MinimalMevApiServer::into_rpc(self.clone())).expect("failed to merge modules"); let server_handle_no_auth = server_no_auth.start(module); let state = StreamState { frags_tx: self.frag_receiver_spawner.clone() }; @@ -145,16 +178,55 @@ impl MinimalEthApiServer for RpcServer { async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult { trace!(?bytes, "new request"); - let tx = Arc::new(Transaction::decode(bytes)?); + let tx = Transaction::decode(bytes).inspect_err(|e| tracing::error!(?e, "failed to decode transaction"))?; + + // TODO(mempirate): in sequencer validation, this was using block_env. Is that an issue? Is this good enough? + let now = time::utils::unix_millis(); + if let Some(supervisor) = &self.supervisor { + supervisor + .validate(&tx, now as u64) + .await + .inspect_err(|e| tracing::error!(?e, "failed to validate transaction"))?; + } + TelemetryUpdate::send_ref(tx.uuid, tx.to_ingested_telemetry(), &self.telemetry_producer); let hash = tx.tx_hash(); - let _ = self.new_order_tx.send(tx.into()); + let order = Order::from(tx); + + if let Err(e) = self.new_order_tx.send(order.into()) { + tracing::error!(?e, "failed to send transaction order to sequencer"); + } Ok(hash) } } +#[async_trait] +impl MinimalMevApiServer for RpcServer { + #[tracing::instrument(skip_all, err, ret(level = Level::TRACE))] + async fn send_bundle(&self, bundle: EthSendBundle) -> RpcResult { + trace!(?bundle, "new bundle request"); + + // Convert to internal bundle type + let bundle = Bundle::::from(bundle); + let bundle_hash = bundle.bundle_hash(); + + // Validate the bundle on a separate thread to avoid blocking this one. + let bundle = tokio::task::spawn_blocking(move || bundle.try_decode()?.validate()).await??; + let order = Order::from(bundle); + + if let Err(e) = self.new_order_tx.send(order.into()) { + tracing::error!(?e, "failed to send bundle order to sequencer"); + } + + // TODO(mempirate): + // - Telemetry + + Ok(EthBundleHash { bundle_hash }) + } +} + #[async_trait] impl ControlApiServer for RpcServer { async fn heartbeat(&self) -> RpcResult<()> { diff --git a/based/crates/rpc/src/supervisor.rs b/based/crates/rpc/src/supervisor.rs new file mode 100644 index 000000000..0e2f01950 --- /dev/null +++ b/based/crates/rpc/src/supervisor.rs @@ -0,0 +1,61 @@ +use alloy_consensus::Transaction as _; +use alloy_primitives::B256; +use bop_common::transaction::Transaction; +use op_alloy_consensus::interop::SafetyLevel; +use reqwest::Url; +use reth_optimism_txpool::supervisor::{ + ExecutingDescriptor, InteropTxValidatorError, SupervisorClient, parse_access_list_items_to_inbox_entries, +}; +use tracing::warn; + +#[derive(Clone, Debug)] +pub struct SuperVisorConfig { + pub url: Url, + pub safety_level: SafetyLevel, +} + +#[derive(Debug, Clone)] +pub(crate) struct SupervisorValidator { + client: SupervisorClient, +} + +impl SupervisorValidator { + pub(crate) async fn new(config: &SuperVisorConfig) -> Self { + let client = SupervisorClient::builder(config.url.clone()).minimum_safety(config.safety_level).build().await; + Self { client } + } + + /// Validates a cross-chain transaction. + pub(crate) async fn validate(&self, tx: &Transaction, timestamp: u64) -> Result<(), InteropTxValidatorError> { + let Some(access_list) = tx.access_list() else { + return Ok(()); + }; + + let inbox_entries = + parse_access_list_items_to_inbox_entries(access_list.iter()).copied().collect::>(); + + let descriptor = ExecutingDescriptor::new(timestamp, None); + + if let Err(err) = self.validate_messages(inbox_entries.as_slice(), descriptor).await { + // TODO: Deal with reconnects. This will require `&mut self` here so it's going to be difficult in the RPC + // context. Maybe the validator should be a separate actor. + warn!(?err, ?tx, "Cross-chain transaction rejected"); + // It's possible that transaction invalid now, but would be valid later. + // We should keep limited queue for transactions that could become valid. + // We should have the limit to ensure that builder won't get overwhelmed. + return Err(err); + } + + Ok(()) + } +} + +impl SupervisorValidator { + pub async fn validate_messages( + &self, + inbox_entries: &[B256], + executing_descriptor: ExecutingDescriptor, + ) -> Result<(), InteropTxValidatorError> { + self.client.check_access_list(inbox_entries, executing_descriptor).await + } +} diff --git a/based/crates/sequencer/Cargo.toml b/based/crates/sequencer/Cargo.toml index e597fa52a..c019dd56b 100644 --- a/based/crates/sequencer/Cargo.toml +++ b/based/crates/sequencer/Cargo.toml @@ -6,8 +6,8 @@ rust-version.workspace = true version.workspace = true [features] -shmem = ["bop-common/shmem"] default = [] +shmem = ["bop-common/shmem"] [dependencies] alloy-consensus.workspace = true diff --git a/based/crates/sequencer/src/block_sync/mock_fetcher.rs b/based/crates/sequencer/src/block_sync/mock_fetcher.rs index a334b997b..3aaa65586 100644 --- a/based/crates/sequencer/src/block_sync/mock_fetcher.rs +++ b/based/crates/sequencer/src/block_sync/mock_fetcher.rs @@ -12,6 +12,7 @@ use bop_common::{ }, config::MockMode, db::{DBFrag, DatabaseRead}, + order::Order, signing::ECDSASigner, time::{Duration, Instant, utils::vsync_busy}, transaction::Transaction, @@ -193,7 +194,7 @@ impl MockFetcher { let tx = OpTxEnvelope::Eip1559(signed_tx); let hash = tx.tx_hash(); let envelope = tx.encoded_2718().into(); - let tx = Arc::new(Transaction::new(tx, from.address, envelope)); + let tx = Order::from(Transaction::new(tx, from.address, envelope)); connections.send(tx); hash } @@ -244,7 +245,7 @@ impl MockFetcher { .unwrap_or_default(); connections.send(fcu); for t in txs_for_pool { - connections.send(t); + connections.send(Order::from(t)); Duration::from_millis(10).sleep(); } @@ -325,13 +326,13 @@ impl MockFetcher { let curt = Instant::now(); connections.send(fcu); for t in txs.iter().take(txs.len() / 10) { - connections.send(t.clone()); + connections.send(Order::from(t.clone())); } if txs.len() < *max_txs { // if we're going to be fetching more, let's send all the rest for t in txs.iter().skip(txs.len() / 10) { - connections.send(t.clone()); + connections.send(Order::from(t.clone())); // Duration::from_millis(20).sleep(); } let blocks: Vec = self.executor.block_on(async { @@ -349,7 +350,7 @@ impl MockFetcher { let t_per_tx = *send_duration / txs.len() * 10usize / 9usize; for t in txs.iter().skip(txs.len() / 10) { vsync_busy(Some(t_per_tx), || { - connections.send(t.clone()); + connections.send(Order::from(t.clone())); }) } } diff --git a/based/crates/sequencer/src/block_sync/mod.rs b/based/crates/sequencer/src/block_sync/mod.rs index 4bc1a0e6a..62e4dcc31 100644 --- a/based/crates/sequencer/src/block_sync/mod.rs +++ b/based/crates/sequencer/src/block_sync/mod.rs @@ -8,7 +8,7 @@ use bop_common::{ metrics::{Counter, Histogram, Metric, MetricsUpdate}, telemetry::{ self, TelemetryUpdate, - order::{IncludedInFrag, Ingested}, + order::{Ingested, TransactionInclusion}, }, time::BlockSyncTimers, }; @@ -223,7 +223,7 @@ impl BlockSync { TelemetryUpdate::send( uuid, - telemetry::Telemetry::Tx(telemetry::Tx::Included(IncludedInFrag { + telemetry::Telemetry::Tx(telemetry::Tx::Included(TransactionInclusion { frag, id_in_frag: id as u16, ..Default::default() diff --git a/based/crates/sequencer/src/config.rs b/based/crates/sequencer/src/config.rs index 6dd5cd9af..31d4f6c14 100644 --- a/based/crates/sequencer/src/config.rs +++ b/based/crates/sequencer/src/config.rs @@ -33,7 +33,6 @@ pub struct SequencerConfig { pub simulate_tof_in_pools: bool, /// If true will commit locally sequenced blocks to the db before getting payload from the engine api. pub commit_sealed_frags_to_db: bool, - pub supervisor: Option, pub da_config: OpDAConfig, } @@ -47,7 +46,6 @@ impl From<&GatewayArgs> for SequencerConfig { simulate_tof_in_pools: false, evm_config: OpEvmConfig::new(args.chain.clone(), Default::default()), commit_sealed_frags_to_db: args.commit_sealed_frags_to_db, - supervisor: args.supervisor_url.as_ref().map(|_| SuperVisorConfig::from(args)), da_config: args.da_config.clone(), } } diff --git a/based/crates/sequencer/src/context.rs b/based/crates/sequencer/src/context.rs index 91062f92f..b086df232 100644 --- a/based/crates/sequencer/src/context.rs +++ b/based/crates/sequencer/src/context.rs @@ -10,6 +10,7 @@ use bop_common::{ custom_v4::OpExecutionPayloadEnvelopeV4Patch, debug_panic, metrics::{Gauge, Metric, MetricsUpdate, metrics_queue}, + order::Order, p2p::{FragV0, SealV0, StateUpdate}, shared::SharedState, telemetry::{TelemetryUpdate, telemetry_queue}, @@ -252,31 +253,31 @@ impl SequencerContext { sorting_data.send_finished_telemetry(); info!( frag_id = frag_seq.next_seq, - txs = sorting_data.txs.len(), + txs = sorting_data.transactions.len(), frag_time =% sorting_data.start_t.elapsed(), "sealing frag", ); - self.shared_state.as_mut().commit_txs(sorting_data.txs.iter_mut()); - self.tx_pool.remove_mined_txs(sorting_data.txs.iter().map(|t| (t.sender_ref(), t)), &mut self.telemetry); + self.shared_state.as_mut().commit_txs(sorting_data.transactions.iter_mut()); + self.tx_pool + .remove_mined_txs(sorting_data.transactions.iter().map(|t| (t.sender_ref(), t)), &mut self.telemetry); let (frag, maybe_state) = frag_seq.apply_sorted_frag(sorting_data, self); (frag, maybe_state, SortingData::new(frag_seq, self)) } } impl + Display> + StorageRootProvider> SequencerContext { - pub fn handle_tx(&mut self, tx: Arc, senders: &SendersSpine) { - if tx.is_deposit() { - self.deposits.push_back(tx); - return; - } - if self.tx_pool.handle_new_tx( - tx.clone(), + pub fn handle_order(&mut self, order: Order, senders: &SendersSpine) { + let uuid = order.uuid(); + let telemetry = order.pool_telemetry(); + + if self.tx_pool.handle_order( + order, self.shared_state.as_ref(), self.base_fee(), false, self.config.simulate_tof_in_pools.then_some(senders), ) { - TelemetryUpdate::send(tx.uuid, tx.to_added_to_pool_telemetry(), &mut self.telemetry); + TelemetryUpdate::send_batch(uuid, telemetry, &mut self.telemetry); } } @@ -320,7 +321,7 @@ impl + Display> + Storage // Apply must include sorting.apply_block_start_to_state(self, simulator_evm_block_params).expect("shouldn't fail"); - self.tx_pool.remove_mined_txs(sorting.txs.iter().map(|t| (t.sender_ref(), t)), &mut self.telemetry); + self.tx_pool.remove_mined_txs(sorting.transactions.iter().map(|t| (t.sender_ref(), t)), &mut self.telemetry); (seq, sorting) } @@ -505,7 +506,7 @@ impl SequencerContext { ); // Completely wipe active txs as they may contain valid nonces with out of date sim results. - self.tx_pool.active_txs.clear(); + self.tx_pool.pending_orders.clear(); self.tx_pool.remove_mined_txs(block.transactions_with_sender(), &mut self.telemetry); if let Some(base_fee) = block.base_fee_per_gas { diff --git a/based/crates/sequencer/src/lib.rs b/based/crates/sequencer/src/lib.rs index 0dfa47ed7..42de757a7 100644 --- a/based/crates/sequencer/src/lib.rs +++ b/based/crates/sequencer/src/lib.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, time::Instant}; +use std::time::Instant; use alloy_consensus::BlockHeader; use alloy_eips::eip7685::RequestsOrHash; @@ -15,11 +15,11 @@ use bop_common::{ custom_v4::OpExecutionPayloadEnvelopeV4Patch, db::DatabaseWrite, metrics::{Gauge, Metric, MetricsUpdate}, + order::{Order, PendingOrder, SimulatedOrder}, p2p::{EnvV0, VersionedMessage, VersionedMessageWithState}, shared::SharedState, telemetry::{self, Telemetry, TelemetryUpdate, system::SystemNotification}, time::{Duration, Repeater}, - transaction::Transaction, typedefs::{BlockSyncMessage, DatabaseRef}, }; use bop_db::DatabaseRead; @@ -39,7 +39,6 @@ pub mod config; mod context; pub mod simulator; pub(crate) mod sorting; -mod supervisor; pub use config::SequencerConfig; use context::SequencerContext; @@ -73,17 +72,14 @@ pub struct Sequencer { state: SequencerState, data: SequencerContext, heartbeat: Repeater, - supervisor: Option, } impl Sequencer { pub fn new(db: Db, shared_state: SharedState, config: SequencerConfig) -> Self { - let supervisor = config.supervisor.as_ref().map(supervisor::SupervisorValidator::from); Self { state: SequencerState::default(), data: SequencerContext::new(db, shared_state, config), heartbeat: Repeater::every(Duration::from_secs(2)), - supervisor, } } } @@ -111,16 +107,8 @@ where let use_tx_pool = self.data.payload_attributes.no_tx_pool.is_none_or(|no_tx_pool| !no_tx_pool); if use_tx_pool { // handle new transaction - connections.receive_for(Duration::from_millis(10), |msg, senders| { - if self.data.timestamp() != 0 && - self.supervisor - .as_ref() - .is_some_and(|validator| !validator.is_valid(&msg, self.data.timestamp())) - { - return; - } - - self.state.handle_new_tx(msg, &mut self.data, senders); + connections.receive_for(Duration::from_millis(10), |msg: Order, senders| { + self.state.handle_new_order(msg, &mut self.data, senders) }); } @@ -381,7 +369,7 @@ where "received FCU when Sorting. Sending already Fragged txs back to the pools and syncing to the new head." ); for tx in frag_seq.txs.into_iter().skip(frag_seq.n_force_include_txs) { - ctx.handle_tx(tx.tx, senders); + ctx.handle_order(Order::Tx(tx.tx), senders); } let start = ctx.db.head_block_number().expect("couldn't get db head block number"); let stop = start + 1; @@ -505,24 +493,26 @@ where } } - /// Sends a new transaction to the tx pool. - /// If we are sorting, we pass Some(senders) to the tx pool so it can send top-of-frag simulations. - fn handle_new_tx(&mut self, tx: Arc, ctx: &mut SequencerContext, senders: &SendersSpine) { + /// Sends a new order to the order pool. + fn handle_new_order(&mut self, order: Order, ctx: &mut SequencerContext, senders: &SendersSpine) { + tracing::trace!(type = order.ty(), "Received new order"); + // Add the (unsimulated) order to the TOF snapshot. + // TODO(mempirate): This might cause issues because it hasn't been simulated, where do we add sim info? if let SequencerState::Sorting(_, sorting_data) = self { - sorting_data - .tof_snapshot - .push_front(bop_common::transaction::SimulatedTxList { current: None, pending: tx.clone().into() }); + sorting_data.tof_snapshot.push_front(PendingOrder::from(order.clone())); } - ctx.handle_tx(tx, senders); + + ctx.handle_order(order, senders); } /// Processes transaction simulation results from the simulator actor. /// /// Handles both block transaction simulations during sorting and - /// transaction pool simulations for future inclusion. + /// order pool simulations for future inclusion. fn handle_sim_result(mut self, result: SimulatorToSequencer, data: &mut SequencerContext) -> Self { - let (sender, nonce) = result.sender_info; let state_id = result.state_id; + let sender_nonces = result.sender_nonces; + let simtime = result.simtime; match result.msg { SimulatorToSequencerMsg::Tx(simulated_tx) => { @@ -534,22 +524,60 @@ where if !sort_data.is_valid(state_id) { return self; } + data.timers.handle_sim.start(); - sort_data.handle_sim(simulated_tx, sender, data.base_fee(), simtime); + sort_data.handle_sim(simulated_tx.map(SimulatedOrder::Tx), sender_nonces, data.base_fee(), simtime); data.timers.handle_sim.stop(); } SimulatorToSequencerMsg::TxPoolTopOfFrag(simulated_tx) => { + let (sender, nonce) = sender_nonces.first().expect("no sender nonces"); + match simulated_tx { - Ok(res) if data.shared_state.as_ref().is_valid(state_id) => data.tx_pool.handle_simulated(res), + Ok(res) if data.shared_state.as_ref().is_valid(state_id) => { + data.tx_pool.handle_simulated(SimulatedOrder::Tx(res)) + } Ok(_) => { // No-op if the simulation is on a different fragment. // We would have already re-sent the tx for sim on the correct fragment. } Err(_e) => { - data.tx_pool.remove(&sender, nonce, &mut data.telemetry); + data.tx_pool.remove(sender, *nonce, &mut data.telemetry); } } } + SimulatorToSequencerMsg::Bundle(simulated_bundle) => { + let SequencerState::Sorting(_, sort_data) = &mut self else { + return self; + }; + + // handle sim on wrong state + if !sort_data.is_valid(state_id) { + return self; + } + + data.timers.handle_sim.start(); + sort_data.handle_sim( + simulated_bundle.map(SimulatedOrder::Bundle), + sender_nonces, + data.base_fee(), + simtime, + ); + data.timers.handle_sim.stop(); + } + SimulatorToSequencerMsg::BundleTopOfFrag(simulated_bundle) => match simulated_bundle { + Ok(res) if data.shared_state.as_ref().is_valid(state_id) => { + data.tx_pool.handle_simulated(SimulatedOrder::Bundle(res)) + } + Ok(_) => { + // No-op if the simulation is on a different fragment. + // We would have already re-sent the bundle for sim on the correct fragment. + } + Err(_e) => { + for (sender, nonce) in sender_nonces { + data.tx_pool.remove(&sender, nonce, &mut data.telemetry); + } + } + }, } self } diff --git a/based/crates/sequencer/src/simulator.rs b/based/crates/sequencer/src/simulator.rs index c06baa6ec..e67835e8e 100644 --- a/based/crates/sequencer/src/simulator.rs +++ b/based/crates/sequencer/src/simulator.rs @@ -10,16 +10,20 @@ use bop_common::{ messages::{SequencerToSimulator, SimulationError, SimulatorToSequencer, SimulatorToSequencerMsg}, }, db::{DBFrag, DBSorting, DatabaseRead, State}, + order::bundle::{SimulatedBundle, ValidatedBundle}, time::{Duration, Instant, Nanos}, transaction::{SimulatedTx, Transaction}, typedefs::*, utils::last_part_of_typename, }; -use op_revm::OpSpecId; +use op_revm::{OpHaltReason, OpSpecId}; use reth_evm::{ConfigureEvm, Evm, EvmEnv, execute::ProviderError}; use reth_optimism_evm::{OpEvm, OpEvmConfig}; use reth_optimism_forks::OpHardfork; -use revm::context::{Block, DBErrorMarker}; +use revm::context::{ + Block, DBErrorMarker, + result::{ExecutionResult, ResultVecAndState}, +}; use revm_inspector::NoOpInspector; use revm_primitives::{Address, U256}; @@ -84,6 +88,19 @@ impl< simulate_tx_inner(tx, evm, regolith_active, allow_zero_payment, allow_revert) } + /// Simulates a bundle at the state of the `db` parameter. + pub fn simulate_bundle( + bundle: Arc, + db: SimulateTxDb, + evm: &mut OpEvm, NoOpInspector, reth_evm::precompiles::PrecompilesMap>, + regolith_active: bool, + allow_zero_payment: bool, + allow_revert: bool, + ) -> Result { + let _ = std::mem::replace(evm.db_mut(), State::new(db)); + simulate_bundle_inner(bundle, evm, regolith_active, allow_zero_payment, allow_revert) + } + /// Updates internal EVM environments with new configuration #[inline] pub fn update_evm_environments(&mut self, evm_block_params: EvmEnv) { @@ -131,6 +148,85 @@ pub fn simulate_tx_inner( Ok(SimulatedTx::new(tx, result_and_state, payment, deposit_nonce, sim_start.elapsed())) } +/// Simulates a bundle atomically, committing state changes between transactions +/// so each subsequent transaction sees the effects of previous ones. +/// +/// The EVM's database must be a `State` wrapper. Commits go to State's +/// in-memory cache, NOT to the underlying database. +pub fn simulate_bundle_inner( + bundle: Arc, + evm: &mut OpEvm, NoOpInspector, reth_evm::precompiles::PrecompilesMap>, + regolith_active: bool, + allow_zero_payment: bool, + allow_revert: bool, +) -> Result +where + Db::Error: Send + Sync + 'static + DBErrorMarker + std::error::Error + Into + Debug + Display, +{ + let coinbase = evm.block().beneficiary; + + // Get initial coinbase balance BEFORE any transactions + let start_balance = balance_from_db(evm.db_mut(), coinbase); + let mut intermediate_balance = start_balance; + + // The post-state of the bundle. + let mut post_state = ResultVecAndState::, EvmState>::new( + Vec::with_capacity(bundle.transactions.len()), + EvmState::default(), + ); + + let mut simulated = Vec::with_capacity(bundle.transactions.len()); + + for tx in bundle.transactions.iter() { + let sim_start = Nanos::now(); + let deposit_nonce = (tx.is_deposit() && regolith_active).then(|| nonce_from_db(evm.db_mut(), tx.sender())); + + let result_and_state = + evm.transact_raw(tx.to_op_tx_env()).map_err(|e| SimulationError::EvmError(format!("{e:?}")))?; + + if !allow_revert && !result_and_state.result.is_success() { + return Err(SimulationError::RevertWithDisallowedRevert); + } + + // Update intermediate balance and payment. + // NOTE: If a transaction does not touch the coinbase, this falls back to the intermediate balance (instead of + // 0, which would be incorrect) + let end_balance = + result_and_state.state.get(&coinbase).map(|a| a.info.balance).unwrap_or_else(|| intermediate_balance); + let payment = end_balance.saturating_sub(intermediate_balance); + intermediate_balance = end_balance; + + // Commit to State's in-memory cache (not the underlying db) + // so subsequent transactions see these state changes + // TODO(mempirate): validate that this is actually the case and we're not committing to underlying db + evm.db_mut().commit_ref(&result_and_state.state); + + post_state.result.push(result_and_state.result.clone()); + post_state.state.extend(result_and_state.state.clone()); + + simulated.push(SimulatedTx::new( + tx.clone().into(), + result_and_state, + payment, + deposit_nonce, + sim_start.elapsed(), + )); + } + + // Calculate total payment AFTER all transactions + let end_balance = balance_from_db(evm.db_mut(), coinbase); + let total_payment = end_balance.saturating_sub(start_balance); + + if !allow_zero_payment && total_payment == U256::ZERO { + return Err(SimulationError::ZeroPayment); + } + + let mut simulated_bundle = SimulatedBundle::new(bundle); + simulated_bundle.set_simulation_results(simulated, total_payment, post_state); + + Ok(simulated_bundle) +} + #[inline] fn nonce_from_db(db: &mut impl Database, address: Address) -> u64 { db.basic(address).ok().flatten().map(|a| a.nonce).unwrap_or_default() @@ -161,7 +257,7 @@ where }); connections.receive(|msg: SequencerToSimulator, senders| { - let (sender, nonce, state_id) = msg.sim_info(); + let (state_id, sender_nonces) = msg.sim_info(); let curt = Instant::now(); let msg = match msg { SequencerToSimulator::SimulateTx(tx, db) => SimulatorToSequencerMsg::Tx(Self::simulate_transaction( @@ -182,9 +278,29 @@ where self.allow_reverts, )) } + SequencerToSimulator::SimulateBundle(bundle, dbsorting) => { + SimulatorToSequencerMsg::Bundle(Self::simulate_bundle( + bundle, + dbsorting, + &mut self.evm_sorting, + self.regolith_active, + true, + self.allow_reverts, + )) + } + SequencerToSimulator::SimulateBundleTof(bundle, dbfrag) => { + SimulatorToSequencerMsg::BundleTopOfFrag(Self::simulate_bundle( + bundle, + dbfrag, + &mut self.evm_tof, + self.regolith_active, + true, + self.allow_reverts, + )) + } }; let _ = senders.send_timeout( - SimulatorToSequencer::new((sender, nonce), state_id, curt.elapsed(), msg), + SimulatorToSequencer::new(sender_nonces, state_id, curt.elapsed(), msg), Duration::from_millis(10), ); }); diff --git a/based/crates/sequencer/src/sorting/frag_sequence.rs b/based/crates/sequencer/src/sorting/frag_sequence.rs index 5c184c4ae..f0bd5d480 100644 --- a/based/crates/sequencer/src/sorting/frag_sequence.rs +++ b/based/crates/sequencer/src/sorting/frag_sequence.rs @@ -79,15 +79,15 @@ impl FragSequence { self.payment += in_sort.payment(); let uuid = in_sort.uuid; - let mut txs = Vec::with_capacity(in_sort.txs.len()); - let mut receipts = HashMap::with_capacity(in_sort.txs.len()); - let mut balances = HashMap::with_capacity(in_sort.txs.len()); + let mut txs = Vec::with_capacity(in_sort.transactions.len()); + let mut receipts = HashMap::with_capacity(in_sort.transactions.len()); + let mut balances = HashMap::with_capacity(in_sort.transactions.len()); let mut in_sort_da_used = 0; - let in_sort_txs = in_sort.txs.len(); + let in_sort_txs = in_sort.transactions.len(); - for tx in in_sort.txs { + for tx in in_sort.transactions { self.gas_used += tx.gas_used(); - in_sort_da_used += tx.tx.estimated_tx_compressed_size(); + in_sort_da_used += tx.estimated_da(); txs.push(Transaction::from(tx.tx.encode().to_vec())); receipts.insert( @@ -167,8 +167,8 @@ mod tests { use alloy_primitives::Bytes; use alloy_rpc_types::engine::PayloadAttributes; use bop_common::{ - communication::Spine, db::DBFrag, shared::SharedState, time::Duration, transaction::Transaction, - utils::initialize_test_tracing, + communication::Spine, db::DBFrag, order::SimulatedOrder, shared::SharedState, time::Duration, + transaction::Transaction, utils::initialize_test_tracing, }; use bop_db::AlloyDB; use op_alloy_rpc_types_engine::OpPayloadAttributes; @@ -222,7 +222,6 @@ mod tests { evm_config: evm_config.clone(), simulate_tof_in_pools: false, commit_sealed_frags_to_db: false, - supervisor: None, da_config: OpDAConfig::default(), }; @@ -289,14 +288,14 @@ mod tests { let new_state = bop_common::db::State::new(db); let _ = std::mem::replace(&mut evm.ctx_mut().db(), &new_state); let result = simulate_tx_inner(tx, evm, true, true, true).unwrap(); - sorting_db.apply_tx(result); + sorting_db.apply_order(SimulatedOrder::Tx(result)); } // Apply the frag of non-must include txs let (_frag, _, _sorting_db) = ctx.seal_frag(sorting_db, &mut seq); // Seal the block - let (_seal, payload) = ctx.seal_block(seq); + let (_seal, payload) = ctx.seal_block(seq, None); assert_eq!(block.hash_slow(), payload.execution_payload.payload_inner.payload_inner.payload_inner.block_hash); } } diff --git a/based/crates/sequencer/src/sorting/mod.rs b/based/crates/sequencer/src/sorting/mod.rs index 84629473a..69aa542ae 100644 --- a/based/crates/sequencer/src/sorting/mod.rs +++ b/based/crates/sequencer/src/sorting/mod.rs @@ -3,7 +3,7 @@ use std::{ ops::{Deref, DerefMut}, }; -use bop_common::transaction::{SimulatedTx, SimulatedTxList}; +use bop_common::order::{PendingOrder, SimulatedOrder}; use revm_primitives::{Address, U256}; use tracing::debug; @@ -14,11 +14,11 @@ pub(crate) use frag_sequence::FragSequence; #[derive(Clone, Debug, Default)] pub struct ActiveOrders { - orders: VecDeque, + orders: VecDeque, } impl ActiveOrders { - pub fn new(mut orders: Vec, fifo_ordering: bool) -> Self { + pub fn new(mut orders: Vec, fifo_ordering: bool) -> Self { if fifo_ordering { // NOTE: This function is used to populate the `tof_snaphost`, where a new transaction // is pushed front on a `VecDeque`. Instead, a new active transaction in the tx pool @@ -42,8 +42,9 @@ impl ActiveOrders { self.orders.len() } + /// Returns the total available value of the orders, i.e. the sum of the simulated payments of the orders. pub fn available_value(&self) -> U256 { - self.orders.iter().map(|t| t.current.as_ref().map(|tx| tx.payment).unwrap_or_default()).sum() + self.orders.iter().map(|t| t.payment().unwrap_or_default()).sum() } /// Removes all pending txs for a sender list. @@ -52,38 +53,74 @@ impl ActiveOrders { if self.is_empty() { return; } - for i in (0..self.len()).rev() { - let order = &mut self.orders[i]; - debug_assert_ne!(order.sender(), Address::default(), "should never have an order with default sender"); - if order.sender() == sender { - if order.pop(base_fee) { - self.orders.swap_remove_back(i).unwrap(); + let len = self.orders.len(); + let mut to_remove = Vec::new(); + + for (i, order) in self.orders.iter_mut().rev().enumerate() { + // Get the actual index of the order in the deque. + let index = len - i - 1; + + match order { + PendingOrder::Tx(list) => { + if list.sender() == sender && list.pop(base_fee) { + to_remove.push(index); + } + } + PendingOrder::Bundle(bundle) => { + if bundle.has_sender(sender) { + // TODO: Needs any additional checks? + to_remove.push(index); + } } - return; } } - unreachable!("this should never happen"); + + for index in to_remove { + self.orders.swap_remove_back(index).unwrap(); + } } - pub fn put(&mut self, tx: SimulatedTx, fifo_ordering: bool) { + pub fn put(&mut self, order: SimulatedOrder, fifo_ordering: bool) { let mut id = self.orders.len(); if !fifo_ordering { - let payment = tx.payment; - let sender = tx.sender(); - for (i, order) in self.orders.iter_mut().enumerate().rev() { - if order.sender() == sender { - order.put(tx); - return; + let payment = order.payment(); + + match order { + SimulatedOrder::Tx(ref tx) => { + let sender = tx.sender(); + for (i, order) in self.orders.iter_mut().enumerate().rev() { + let Some(order) = order.as_tx_list_mut() else { + return; + }; + + if order.sender() == sender { + order.put(tx.clone()); + return; + } + + if payment < order.payment() { + id = i; + } + } } - if payment < order.payment() { - id = i; + SimulatedOrder::Bundle(_) => { + for (i, order) in self.orders.iter_mut().enumerate().rev() { + let Some(order) = order.as_bundle() else { + return; + }; + + if payment < order.payment() { + id = i; + } + } } } } + // not found so we insert it at the id corresponding to the payment - self.orders.insert(id, SimulatedTxList::from(tx)) + self.orders.insert(id, PendingOrder::from(order)) } /// Checks whether we have enough gas remaining for order at id. @@ -149,7 +186,7 @@ impl ActiveOrders { } impl Deref for ActiveOrders { - type Target = VecDeque; + type Target = VecDeque; fn deref(&self) -> &Self::Target { &self.orders diff --git a/based/crates/sequencer/src/sorting/sorting_data.rs b/based/crates/sequencer/src/sorting/sorting_data.rs index d00102a42..1950f0e39 100644 --- a/based/crates/sequencer/src/sorting/sorting_data.rs +++ b/based/crates/sequencer/src/sorting/sorting_data.rs @@ -11,6 +11,7 @@ use bop_common::{ }, db::{DBSorting, state::ensure_create2_deployer}, metrics::{Counter, Gauge, Histogram, Metric, MetricsUpdate}, + order::{PendingOrder, SimulatedOrder}, telemetry::{Telemetry, TelemetryUpdate}, time::{Duration, Instant}, transaction::{SimulatedTx, Transaction}, @@ -26,7 +27,6 @@ use reth_evm::{ use reth_optimism_evm::OpBlockExecutionError; use reth_optimism_payload_builder::config::OpDAConfig; use revm_primitives::{Address, U256}; -use tracing::{debug, trace}; use uuid::Uuid; use super::FragSequence; @@ -100,7 +100,7 @@ pub struct SortingData { pub da_used: u64, pub da_footprint_gas_scalar: Option, pub payment: U256, - pub txs: Vec, + pub transactions: Vec, /// Sort frag until, and then commit pub until: Instant, /// We wait until these are back before we apply the next @@ -118,7 +118,7 @@ pub struct SortingData { /// While sim results come back, we keep track of the most valuable one here. /// If when all results are back (i.e. `in_flight_sims == 0`) this is Some, /// we apply it to the `db` and send off the next batch of sims. - pub next_to_be_applied: Option, + pub next_to_be_applied: Option, pub start_t: Instant, @@ -139,12 +139,12 @@ impl SortingData { let tof_snapshot = if data.payload_attributes.no_tx_pool.unwrap_or_default() { ActiveOrders::empty() } else { - ActiveOrders::new(data.tx_pool.clone_active(), data.config.fifo_ordering) + ActiveOrders::new(data.tx_pool.snapshot().collect(), data.config.fifo_ordering) }; let db = DBSorting::new(data.shared_state.as_ref().clone()); let _ = ensure_create2_deployer(data.chain_spec().clone(), data.timestamp(), &mut db.db.write()); let uuid = Uuid::new_v4(); - let mut telemetry_producer = data.telemetry; + let mut telemetry_producer = data.telemetry.clone(); TelemetryUpdate::send( uuid, Telemetry::Frag(bop_common::telemetry::Frag::SorterStart { @@ -155,7 +155,7 @@ impl SortingData { &mut telemetry_producer, ); - debug!(da_remaining = seq.da_remaining, gas_remaining = seq.gas_remaining, "new sorting data created"); + tracing::debug!(da_remaining = seq.da_remaining, gas_remaining = seq.gas_remaining, "new sorting data created"); let since_last_seal = data.last_seal_time.elapsed().into(); let adjusted_frag_time = @@ -175,7 +175,7 @@ impl SortingData { da_used: seq.da_used, da_footprint_gas_scalar: data.da_footprint_gas_scalar, fifo_ordering: data.config.fifo_ordering, - txs: vec![], + transactions: vec![], start_t: Instant::now(), telemetry: Default::default(), uuid, @@ -187,11 +187,11 @@ impl SortingData { impl SortingData { pub fn is_empty(&self) -> bool { - self.txs.is_empty() + self.transactions.is_empty() } pub fn gas_used(&self) -> u64 { - self.txs.iter().map(|t| t.gas_used()).sum() + self.transactions.iter().map(|t| t.gas_used()).sum() } pub fn payment(&self) -> U256 { @@ -205,18 +205,19 @@ impl SortingData { /// Handles the result of a simulation. `simulated_tx` simulated_at_id should be pre-verified. pub fn handle_sim( &mut self, - simulated_tx: SimulationResult, - sender: Address, + simulated_order: SimulationResult, + sender_nonces: Vec<(Address, u64)>, base_fee: u64, simtime: Duration, ) { self.in_flight_sims -= 1; self.telemetry.tot_sim_time += simtime; - trace!("handling sender {sender}"); + tracing::trace!(?sender_nonces, "handling simulation result"); + // handle errored sim - let Ok(simulated_tx) = simulated_tx.inspect_err(|e| { - tracing::trace!("error {e} for tx: {}", sender); + let Ok(order) = simulated_order.inspect_err(|e| { + tracing::trace!(?sender_nonces, error = ?e, "error handling simulation result"); // Send metric for simulation error MetricsUpdate::send( self.uuid, @@ -224,14 +225,18 @@ impl SortingData { &mut self.metrics_producer, ); }) else { - self.tof_snapshot.remove_from_sender(sender, base_fee); + for (sender, _) in &sender_nonces { + self.tof_snapshot.remove_from_sender(*sender, base_fee); + } + self.telemetry.n_sims_errored += 1; return; }; - trace!("succesful for nonce {}", simulated_tx.nonce_ref()); - if self.gas_remaining < simulated_tx.gas_used() { - self.tof_snapshot.remove_from_sender(sender, base_fee); + if self.gas_remaining < order.gas_used() { + for (sender, _) in &sender_nonces { + self.tof_snapshot.remove_from_sender(*sender, base_fee); + } return; } self.telemetry.n_sims_succesful += 1; @@ -250,13 +255,14 @@ impl SortingData { &mut self.metrics_producer, ); - let tx_to_put_back = if simulated_tx.gas_used() < self.gas_remaining && - self.next_to_be_applied.as_ref().is_none_or(|t| t.payment < simulated_tx.payment && !self.fifo_ordering) + let tx_to_put_back = if order.gas_used() < self.gas_remaining && + self.next_to_be_applied.as_ref().is_none_or(|t| t.payment() < order.payment() && !self.fifo_ordering) { - self.next_to_be_applied.replace(simulated_tx) + self.next_to_be_applied.replace(order) } else { - Some(simulated_tx) + Some(order) }; + if let Some(tx) = tx_to_put_back { self.tof_snapshot.put(tx, self.fifo_ordering) } @@ -276,8 +282,8 @@ impl SortingData { Telemetry::Frag(bop_common::telemetry::Frag::SorterFinish { success: true, payment: self.payment.into(), - best_order_value: self.txs.iter().map(|t| t.payment).max().unwrap_or_default().into(), - n_txs: self.txs.len(), + best_order_value: self.transactions.iter().map(|t| t.payment).max().unwrap_or_default().into(), + n_txs: self.transactions.len(), gas_used: self.gas_used(), }), &mut self.telemetry_producer, @@ -286,7 +292,7 @@ impl SortingData { // Send metrics for fragment size and duration MetricsUpdate::send( self.uuid, - Metric::SetGauge(Gauge::GatewayFragTxCount, self.txs.len() as f64), + Metric::SetGauge(Gauge::GatewayFragTxCount, self.transactions.len() as f64), &mut self.metrics_producer, ); @@ -330,7 +336,7 @@ impl SortingData { found = true; debug_assert!(res.tx.is_deposit(), "somehow found a valid sim that wasn't a deposit"); - self.apply_tx(res); + self.apply_order(SimulatedOrder::Tx(res)); }); } } @@ -361,17 +367,45 @@ impl SortingData { i -= 1; continue; } - let order = self.tof_snapshot[i].next_to_sim(); - debug_assert!(order.is_some(), "Unsimmable TxList should have been cleared previously"); - let tx_to_sim = order.unwrap(); - senders.send(SequencerToSimulator::SimulateTx(tx_to_sim, self.state())); - self.in_flight_sims += 1; - self.telemetry.n_sims_sent += 1; - sims_sent += 1; - if i == 0 { - break; + + let state = self.state(); + // Make a copy of i to avoid closure capture issues. + let k = i; + + let mut postprocess = || { + self.in_flight_sims += 1; + self.telemetry.n_sims_sent += 1; + sims_sent += 1; + // If we've reached the end of the snapshot, return true to break the loop. + if i == 0 { + return true; + } + + i -= 1; + false + }; + + match &mut self.tof_snapshot[k] { + PendingOrder::Tx(list) => { + let tx = list.next_to_sim(); + debug_assert!(tx.is_some(), "Unsimmable TxList should have been cleared previously"); + + if let Some(tx) = tx { + senders.send(SequencerToSimulator::SimulateTx(tx, state)); + if postprocess() { + break; + } + } + } + PendingOrder::Bundle(bundle) => { + if !bundle.is_simulated() { + senders.send(SequencerToSimulator::SimulateBundle(bundle.validated(), state)); + if postprocess() { + break; + } + } + } } - i -= 1; } // Send metrics for simulation requests @@ -402,23 +436,29 @@ impl SortingData { } impl SortingData { - pub fn apply_tx(&mut self, tx: SimulatedTx) { - self.db.commit_ref(&tx.result_and_state.state); - TelemetryUpdate::send( - tx.uuid, - tx.to_included_telemetry(self.uuid, self.txs.len()), + pub fn apply_order(&mut self, order: SimulatedOrder) { + let result_and_state = order.result_and_state().expect("order was simulated"); + let sim_time = order.sim_time().expect("order was simulated"); + + self.db.commit_ref(result_and_state.state()); + TelemetryUpdate::send_batch( + order.uuid(), + order.included_telemetry(self.uuid, self.transactions.len()), &mut self.telemetry_producer, ); - let sim_time = tx.sim_time; - let gas_used = tx.as_ref().result.gas_used(); - debug_assert!(self.gas_remaining > gas_used, "had too little gas remaining to apply tx {tx:#?}"); + let gas_used = result_and_state.gas_used(); + debug_assert!(self.gas_remaining > gas_used, "had too little gas remaining to apply order {order:#?}"); self.gas_remaining -= gas_used; - let tx_da_size = tx.tx.estimated_tx_compressed_size(); - self.da_remaining = self.da_remaining.map(|da| da.saturating_sub(tx_da_size)); - self.da_used = self.da_used.saturating_add(tx_da_size); - self.txs.push(tx); + let order_da = order.estimated_da(); + self.da_remaining = self.da_remaining.map(|da| da.saturating_sub(order_da)); + self.da_used = self.da_used.saturating_add(order_da); + + match order { + SimulatedOrder::Tx(tx) => self.transactions.push(tx), + SimulatedOrder::Bundle(bundle) => self.transactions.extend(bundle.into_transactions()), + } // Send metrics for transaction processing MetricsUpdate::send( @@ -429,9 +469,13 @@ impl SortingData { } pub fn maybe_apply(&mut self, base_fee: u64) { - if let Some(tx_to_apply) = std::mem::take(&mut self.next_to_be_applied) { - self.tof_snapshot.remove_from_sender(tx_to_apply.sender(), base_fee); - self.apply_tx(tx_to_apply); + if let Some(order) = std::mem::take(&mut self.next_to_be_applied) { + // Clean up the TOF snapshot + for sender in order.senders() { + self.tof_snapshot.remove_from_sender(sender, base_fee); + } + + self.apply_order(order); } } } @@ -493,7 +537,7 @@ impl + Display>> SortingD self.da_remaining = self.da_remaining.map(|da| da.saturating_sub(tx_da_size)); self.da_used = self.da_used.saturating_add(tx_da_size); self.payment += simulated_tx.payment; - self.txs.push(simulated_tx); + self.transactions.push(simulated_tx); } Ok(()) diff --git a/based/crates/sequencer/src/supervisor.rs b/based/crates/sequencer/src/supervisor.rs deleted file mode 100644 index 66b3ead98..000000000 --- a/based/crates/sequencer/src/supervisor.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::sync::Arc; - -use alloy_consensus::Transaction as _; -use bop_common::transaction::Transaction; -use reth_optimism_txpool::supervisor::{ - ExecutingDescriptor, InteropTxValidatorError, SupervisorClient, parse_access_list_items_to_inbox_entries, -}; -use revm_primitives::B256; -use tracing::warn; - -use crate::config::SuperVisorConfig; - -#[derive(Debug, Clone)] -pub struct SupervisorValidator { - client: SupervisorClient, -} - -impl SupervisorValidator { - pub async fn new(config: &SuperVisorConfig) -> Self { - let client = SupervisorClient::builder(config.url.clone()).minimum_safety(config.safety_level).build().await; - Self { client } - } - - pub fn is_valid(&self, tx: &Arc, timestamp: u64) -> bool { - let Some(access_list) = tx.access_list() else { - return true; - }; - - let inbox_entries = - parse_access_list_items_to_inbox_entries(access_list.iter()).copied().collect::>(); - - let descriptor = ExecutingDescriptor::new(timestamp, None); - let res = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - .block_on(self.validate_messages(inbox_entries.as_slice(), descriptor)); - #[allow(clippy::match_single_binding)] - match res { - Ok(()) => true, - Err(err) => { - match err { - // // TODO: we should add reconnecting to supervisor in case of disconnect - // InteropTxValidatorError::SupervisorServerError(err) => { - // warn!(%err, ?tx, "Supervisor error, skipping."); - // false - // } - // InteropTxValidatorError::ValidationTimeout(_) => { - // warn!(%err, ?tx, "Cross tx validation timed out, skipping."); - // false - // } - err => { - warn!(%err, ?tx, "Cross tx rejected."); - // It's possible that transaction invalid now, but would be valid later. - // We should keep limited queue for transactions that could become valid. - // We should have the limit to ensure that builder won't get overwhelmed. - false - } - } - } - } - } -} - -impl SupervisorValidator { - pub async fn validate_messages( - &self, - inbox_entries: &[B256], - executing_descriptor: ExecutingDescriptor, - ) -> Result<(), InteropTxValidatorError> { - self.client.check_access_list(inbox_entries, executing_descriptor).await - } -} - -impl From<&SuperVisorConfig> for SupervisorValidator { - fn from(value: &SuperVisorConfig) -> Self { - tokio::runtime::Handle::current().block_on(Self::new(value)) - } -} diff --git a/based/main-node.just b/based/main-node.just index 4e203db0c..ef761ec55 100644 --- a/based/main-node.just +++ b/based/main-node.just @@ -16,9 +16,9 @@ export BASED_OP_NODE_DATA_DIR := env("BASED_OP_NODE_DATA_DIR", join(BASED_MAIN_N export MAIN_NODE_DIR := join(justfile_directory(), "..", "main_node") # External environment configuration -export OP_PROPOSER_KEY := env("OP_PROPOSER_KEY", shell(wallet + " key proposer")) -export OP_BATCHER_KEY := env("OP_BATCHER_KEY", shell(wallet + " key batcher")) -export OP_SEQUENCER_KEY := env("OP_SEQUENCER_KEY", shell(wallet + " key sequencer")) +export OP_PROPOSER_KEY := shell("echo ${OP_PROPOSER_KEY:-$(" + wallet + " key proposer)} | grep . || exit 1") +export OP_BATCHER_KEY := shell("echo ${OP_BATCHER_KEY:-$(" + wallet + " key batcher)} | grep . || exit 1") +export OP_SEQUENCER_KEY := shell("echo ${OP_SEQUENCER_KEY:-$(" + wallet + " key sequencer)} | grep . || exit 1") # Force config creation export FORCE := env("FORCE", "") diff --git a/based/scripts/deploy.just b/based/scripts/deploy.just index db2e44866..d5d6e5f3e 100644 --- a/based/scripts/deploy.just +++ b/based/scripts/deploy.just @@ -13,9 +13,9 @@ export MAIN_NODE_DIR := join(justfile_directory(), "..", "..", "main_node") export L1_CHAIN_ID := env("L1_CHAIN_ID", "11155111") # Default to Sepolia export L2_CHAIN_ID := env("L2_CHAIN_ID", shell(gen-l2-chain-id)) -export OP_PROPOSER_KEY := env("OP_PROPOSER_KEY", shell(wallet + " key proposer")) -export OP_BATCHER_KEY := env("OP_BATCHER_KEY", shell(wallet + " key batcher")) -export OP_SEQUENCER_KEY := env("OP_SEQUENCER_KEY", shell(wallet + " key sequencer")) +export OP_PROPOSER_KEY := shell("echo ${OP_PROPOSER_KEY:-$(" + wallet + " key proposer)} | grep . || exit 1") +export OP_BATCHER_KEY := shell("echo ${OP_BATCHER_KEY:-$(" + wallet + " key batcher)} | grep . || exit 1") +export OP_SEQUENCER_KEY := shell("echo ${OP_SEQUENCER_KEY:-$(" + wallet + " key sequencer)} | grep . || exit 1") # # Force config creation export FORCE := env("FORCE", "") diff --git a/follower_node/compose.dev.yml b/follower_node/compose.dev.yml index 069b4a293..36159daab 100644 --- a/follower_node/compose.dev.yml +++ b/follower_node/compose.dev.yml @@ -11,7 +11,7 @@ services: geth init \ --state.scheme=hash \ --datadir=/data/geth \ - /config/genesis.json + /config/genesis.json fi # 2) then exec the real geth with all your flags exec geth \ @@ -114,5 +114,7 @@ services: - /tmp:/tmp - /dev/shm:/dev/shm - /var/log/containers/based-op/based-gateway:/var/log/app + environment: + - RUST_LOG=debug network_mode: "host" restart: unless-stopped