From 739600d85e730b4b92c393c0e34ec4bc0d3ce501 Mon Sep 17 00:00:00 2001 From: owen Date: Tue, 13 Jan 2026 15:10:14 +0000 Subject: [PATCH 1/2] adds db handle --- Cargo.lock | 1 + crates/common/Cargo.toml | 1 + crates/common/src/adjustments.rs | 1 + crates/common/src/local_cache.rs | 101 ++- crates/common/src/metrics.rs | 10 + crates/relay/src/api/builder/api.rs | 8 +- crates/relay/src/api/builder/gossip.rs | 9 +- crates/relay/src/api/proposer/get_header.rs | 29 +- crates/relay/src/api/proposer/get_payload.rs | 62 +- crates/relay/src/api/proposer/mod.rs | 6 +- crates/relay/src/api/proposer/register.rs | 12 +- crates/relay/src/api/proposer/tests.rs | 32 - crates/relay/src/api/service.rs | 27 +- crates/relay/src/auctioneer/bid_adjustor.rs | 16 +- crates/relay/src/auctioneer/bid_sorter.rs | 4 + crates/relay/src/auctioneer/block_merger.rs | 41 +- crates/relay/src/auctioneer/context.rs | 92 +- crates/relay/src/auctioneer/get_header.rs | 20 +- crates/relay/src/auctioneer/get_payload.rs | 14 +- crates/relay/src/auctioneer/mod.rs | 73 +- .../relay/src/auctioneer/simulator/manager.rs | 21 +- crates/relay/src/auctioneer/simulator/mod.rs | 24 +- crates/relay/src/auctioneer/submit_block.rs | 13 +- crates/relay/src/auctioneer/types.rs | 162 +++- crates/relay/src/auctioneer/worker.rs | 38 +- crates/relay/src/database/handle.rs | 206 +++++ crates/relay/src/database/mod.rs | 23 +- .../postgres/migrations/V39__relay_info.sql | 11 + .../database/postgres/postgres_db_service.rs | 829 ++++++++++-------- .../postgres/postgres_db_service_tests.rs | 50 -- crates/relay/src/database/types/params.rs | 4 +- crates/relay/src/gossip/types.rs | 8 +- crates/relay/src/housekeeper/error.rs | 3 + crates/relay/src/housekeeper/housekeeper.rs | 115 +-- .../src/housekeeper/inclusion_list/service.rs | 18 +- crates/relay/src/housekeeper/mod.rs | 6 +- crates/relay/src/lib.rs | 10 +- crates/relay/src/main.rs | 89 +- crates/simulator/src/block_merging/mod.rs | 2 +- crates/simulator/src/validation/mod.rs | 1 + crates/types/src/bid_submission.rs | 50 +- crates/types/src/blobs.rs | 41 +- crates/types/src/hydration.rs | 11 +- crates/types/src/lib.rs | 4 +- 44 files changed, 1362 insertions(+), 936 deletions(-) create mode 100644 crates/relay/src/database/handle.rs create mode 100644 crates/relay/src/database/postgres/migrations/V39__relay_info.sql diff --git a/Cargo.lock b/Cargo.lock index d5839b15..0bfb8344 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5004,6 +5004,7 @@ dependencies = [ "prometheus", "rand 0.9.2", "reqwest 0.11.27", + "rustc-hash", "serde", "serde_json", "serde_yaml", diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 3de10245..0eb9d73b 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -32,6 +32,7 @@ parking_lot.workspace = true prometheus.workspace = true rand.workspace = true reqwest.workspace = true +rustc-hash.workspace = true serde.workspace = true serde_json.workspace = true serde_yaml.workspace = true diff --git a/crates/common/src/adjustments.rs b/crates/common/src/adjustments.rs index d80aec2c..4153dc0b 100644 --- a/crates/common/src/adjustments.rs +++ b/crates/common/src/adjustments.rs @@ -14,4 +14,5 @@ pub struct DataAdjustmentsEntry { pub submitted_value: U256, pub adjusted_block_hash: B256, pub adjusted_value: U256, + pub is_dry_run: bool, } diff --git a/crates/common/src/local_cache.rs b/crates/common/src/local_cache.rs index 2114181d..979cd1ee 100644 --- a/crates/common/src/local_cache.rs +++ b/crates/common/src/local_cache.rs @@ -9,17 +9,22 @@ use axum::{ response::{IntoResponse, Response}, }; use dashmap::{DashMap, DashSet}; -use helix_types::{BlsPublicKeyBytes, CryptoError, MergedBlock}; +use helix_types::{BlsPublicKeyBytes, CryptoError, MergedBlock, SignedValidatorRegistration}; use http::HeaderValue; use parking_lot::RwLock; +use rustc_hash::FxHashSet; use tracing::{error, info}; use crate::{ - BuilderConfig, BuilderInfo, ProposerInfo, - api::builder_api::{ - BuilderGetValidatorsResponseEntry, InclusionListWithKey, InclusionListWithMetadata, - SlotCoordinate, + BuilderConfig, BuilderInfo, ProposerInfo, SignedValidatorRegistrationEntry, + api::{ + builder_api::{ + BuilderGetValidatorsResponseEntry, InclusionListWithKey, InclusionListWithMetadata, + SlotCoordinate, + }, + proposer_api::ValidatorRegistrationInfo, }, + chain_info::ChainInfo, }; const ESTIMATED_TRUSTED_PROPOSERS: usize = 200_000; @@ -94,6 +99,14 @@ pub struct LocalCache { kill_switch: Arc, proposer_duties: Arc>>, merged_blocks: Arc>, + pub validator_registration_cache: + Arc>, + pub pending_validator_registrations: Arc>, + pub known_validators_cache: Arc>>, + pub validator_pool_cache: Arc>, + chain_info: Option>, + pub adjustments_enabled: Arc, + pub adjustments_failsafe_trigger: Arc, } impl LocalCache { @@ -106,6 +119,12 @@ impl LocalCache { let kill_switch = Arc::new(AtomicBool::new(false)); let proposer_duties = Arc::new(RwLock::new(Vec::new())); let merged_blocks = Arc::new(DashMap::with_capacity(1000)); + let validator_registration_cache = Arc::new(DashMap::new()); + let pending_validator_registrations = Arc::new(DashSet::new()); + let known_validators_cache = Arc::new(RwLock::new(FxHashSet::default())); + let validator_pool_cache = Arc::new(DashMap::new()); + let adjustments_enabled = Arc::new(AtomicBool::new(false)); + let adjustments_failsafe_trigger = Arc::new(AtomicBool::new(false)); Self { inclusion_list: Default::default(), @@ -116,6 +135,13 @@ impl LocalCache { kill_switch, proposer_duties, merged_blocks, + validator_registration_cache, + pending_validator_registrations, + known_validators_cache, + validator_pool_cache, + chain_info: None, + adjustments_enabled, + adjustments_failsafe_trigger, } } @@ -238,6 +264,71 @@ impl LocalCache { pub fn get_merged_block(&self, block_hash: &B256) -> Option { self.merged_blocks.get(block_hash).map(|b| b.value().clone()) } + + pub fn is_registration_update_required( + &self, + registration: &SignedValidatorRegistration, + ) -> bool { + if let Some(existing_entry) = + self.validator_registration_cache.get(®istration.message.pubkey) && + existing_entry.registration_info.registration.message.timestamp >= + registration.message.timestamp.saturating_sub(60 * 60) && + existing_entry.registration_info.registration.message.fee_recipient == + registration.message.fee_recipient && + existing_entry.registration_info.registration.message.gas_limit == + registration.message.gas_limit + { + // do registration once per hour, unless fee recipient / gas limit has changed + + return false; + } + true + } + + /// Assume the entries are already validated + pub fn save_validator_registrations( + &self, + entries: impl Iterator, + pool_name: Option, + user_agent: Option, + ) { + for entry in entries { + self.pending_validator_registrations.insert(entry.registration.message.pubkey); + self.validator_registration_cache.insert( + entry.registration.message.pubkey, + SignedValidatorRegistrationEntry::new( + entry.clone(), + pool_name.clone(), + user_agent.clone(), + ), + ); + } + } + + pub fn get_validator_pool_name(&self, api_key: &str) -> Option { + self.validator_pool_cache.get(api_key).map(|v| v.value().clone()) + } + + pub fn get_chain_info(&self) -> Option> { + self.chain_info.clone() + } + + pub fn set_chain_info(&mut self, chain_info: Arc) { + self.chain_info = Some(chain_info); + } + + pub fn get_validator_registrations_for_pub_keys( + &self, + pub_keys: &[BlsPublicKeyBytes], + ) -> Vec { + let mut registrations = Vec::with_capacity(pub_keys.len()); + for pub_key in pub_keys { + if let Some(entry) = self.validator_registration_cache.get(pub_key) { + registrations.push(entry.clone()); + } + } + registrations + } } #[cfg(test)] diff --git a/crates/common/src/metrics.rs b/crates/common/src/metrics.rs index ec87e6ed..8bfdaee1 100644 --- a/crates/common/src/metrics.rs +++ b/crates/common/src/metrics.rs @@ -255,6 +255,12 @@ lazy_static! { ) .unwrap(); + static ref ADJUSTMENTS_DISABLED_COUNT: IntCounter = register_int_counter_with_registry!( + "adjustments_disabled_count_total", + "Count of adjustments disablments", + &RELAY_METRICS_REGISTRY + ) + .unwrap(); static ref SIMULATOR_SYNC: GaugeVec = register_gauge_vec_with_registry!( "simulator_synced", @@ -739,6 +745,10 @@ impl SimulatorMetrics { BUILDER_DEMOTION_COUNT.inc(); } + pub fn disable_adjustments() { + ADJUSTMENTS_DISABLED_COUNT.inc(); + } + pub fn simulator_sync(simulator: &str, is_synced: bool) { SIMULATOR_SYNC.with_label_values(&[simulator]).set(is_synced as i64 as f64); } diff --git a/crates/relay/src/api/builder/api.rs b/crates/relay/src/api/builder/api.rs index b87bf00a..40104026 100644 --- a/crates/relay/src/api/builder/api.rs +++ b/crates/relay/src/api/builder/api.rs @@ -4,8 +4,8 @@ use axum::{Extension, http::StatusCode, response::IntoResponse}; use helix_common::{RelayConfig, api::builder_api::TopBidUpdate, local_cache::LocalCache}; use crate::{ - api::Api, auctioneer::AuctioneerHandle, - database::postgres::postgres_db_service::PostgresDatabaseService, housekeeper::CurrentSlotInfo, + api::Api, auctioneer::AuctioneerHandle, database::handle::DbHandle, + housekeeper::CurrentSlotInfo, }; pub(crate) const MAX_PAYLOAD_LENGTH: usize = 1024 * 1024 * 20; // 20MB @@ -13,7 +13,7 @@ pub(crate) const MAX_PAYLOAD_LENGTH: usize = 1024 * 1024 * 20; // 20MB #[derive(Clone)] pub struct BuilderApi { pub local_cache: Arc, - pub db: Arc, + pub db: DbHandle, pub curr_slot_info: CurrentSlotInfo, pub relay_config: Arc, /// Subscriber for TopBid updates, SSZ encoded @@ -25,7 +25,7 @@ pub struct BuilderApi { impl BuilderApi { pub fn new( local_cache: Arc, - db: Arc, + db: DbHandle, relay_config: RelayConfig, curr_slot_info: CurrentSlotInfo, top_bid_tx: tokio::sync::broadcast::Sender, diff --git a/crates/relay/src/api/builder/gossip.rs b/crates/relay/src/api/builder/gossip.rs index aa8dc8f5..e9c0881d 100644 --- a/crates/relay/src/api/builder/gossip.rs +++ b/crates/relay/src/api/builder/gossip.rs @@ -1,4 +1,4 @@ -use helix_common::{GossipedPayloadTrace, spawn_tracked, utils::utcnow_ns}; +use helix_common::{GossipedPayloadTrace, utils::utcnow_ns}; use tracing::{debug, error}; use uuid::Uuid; @@ -20,11 +20,6 @@ impl BuilderApi { } // Save gossiped payload trace to db - let db = self.db.clone(); - spawn_tracked!(async move { - if let Err(err) = db.save_gossiped_payload_trace(block_hash, trace).await { - error!(%err, "failed to store gossiped payload trace") - } - }); + self.db.save_gossiped_payload_trace(block_hash, trace); } } diff --git a/crates/relay/src/api/proposer/get_header.rs b/crates/relay/src/api/proposer/get_header.rs index aebdce3a..2279000e 100644 --- a/crates/relay/src/api/proposer/get_header.rs +++ b/crates/relay/src/api/proposer/get_header.rs @@ -115,29 +115,18 @@ impl ProposerApi { let bid_block_hash = *bid.block_hash(); let value = *bid.value(); - let db = proposer_api.db.clone(); - spawn_tracked!( - async move { - if let Err(err) = db - .save_get_header_call( - params, - bid_block_hash, - value, - trace, - is_mev_boost, - user_agent, - ) - .await - { - error!(%err, "error saving get header call to database"); - } - } - .in_current_span() + proposer_api.db.save_get_header_call( + params, + bid_block_hash, + value, + trace, + is_mev_boost, + user_agent, ); let fork = proposer_api.chain_info.current_fork_name(); - let payload_and_blobs = bid.payload_and_blobs.clone(); - let bid_data = bid.bid_data.clone(); + let payload_and_blobs = bid.payload_and_blobs(); + let bid_data = bid.bid_data_ref().to_owned(); let bid = bid.into_builder_bid_slow(); let signed_bid = resign_builder_bid(bid, &proposer_api.signing_context, fork); diff --git a/crates/relay/src/api/proposer/get_payload.rs b/crates/relay/src/api/proposer/get_payload.rs index 7a2cda4e..d658374a 100644 --- a/crates/relay/src/api/proposer/get_payload.rs +++ b/crates/relay/src/api/proposer/get_payload.rs @@ -124,13 +124,12 @@ impl ProposerApi { Ok(get_payload_response) => Ok(axum::Json(get_payload_response)), Err(err) => { // Save error to DB - if let Err(err) = proposer_api - .db - .save_failed_get_payload(slot.into(), block_hash, err.to_string(), trace) - .await - { - error!(err = ?err, "error saving failed get payload"); - } + proposer_api.db.save_failed_get_payload( + slot.into(), + block_hash, + err.to_string(), + trace, + ); Err(err) } @@ -220,13 +219,12 @@ impl ProposerApi { Ok(_get_payload_response) => Ok(StatusCode::ACCEPTED), Err(err) => { // Save error to DB - if let Err(err) = proposer_api - .db - .save_failed_get_payload(slot.into(), block_hash, err.to_string(), trace) - .await - { - error!(err = ?err, "error saving failed get payload"); - } + proposer_api.db.save_failed_get_payload( + slot.into(), + block_hash, + err.to_string(), + trace, + ); Err(err) } @@ -293,19 +291,13 @@ impl ProposerApi { warn!(error = %err, "get_payload was sent too late"); // Save too late request to db for debugging - if let Err(err) = self - .db - .save_too_late_get_payload( - (head_slot + 1).into(), - &proposer_public_key, - &block_hash, - trace.receive, - trace.payload_fetched, - ) - .await - { - error!(%err, "failed to save too late get payload"); - } + self.db.save_too_late_get_payload( + (head_slot + 1).into(), + proposer_public_key, + block_hash, + trace.receive, + trace.payload_fetched, + ); return Err(err); } @@ -351,7 +343,7 @@ impl ProposerApi { .save_delivered_payload_info( slot.as_u64(), payload_clone, - bid, + &bid, proposer_public_key, proposer_fee_recipient, &trace_clone, @@ -433,15 +425,14 @@ impl ProposerApi { async fn save_delivered_payload_info( &self, slot: u64, - payload: Arc, - bid: PayloadBidData, + payload: PayloadAndBlobs, + bid: &PayloadBidData, proposer_public_key: BlsPublicKeyBytes, proposer_fee_recipient: Address, trace: &GetPayloadTrace, user_agent: Option, filtering: Filtering, ) { - let db = self.db.clone(); let trace = *trace; let params = SavePayloadParams { slot, @@ -449,16 +440,13 @@ impl ProposerApi { proposer_pub_key: proposer_public_key, value: bid.value, proposer_fee_recipient, - payload: payload.clone(), + payload, latency_trace: trace, user_agent, filtering, }; - spawn_tracked!(async move { - if let Err(err) = db.save_delivered_payload(¶ms).await { - error!(%err, "error saving payload to database"); - } - }); + + self.db.save_delivered_payload(params); } pub(crate) async fn gossip_payload( diff --git a/crates/relay/src/api/proposer/mod.rs b/crates/relay/src/api/proposer/mod.rs index a5834ce4..541b6e73 100644 --- a/crates/relay/src/api/proposer/mod.rs +++ b/crates/relay/src/api/proposer/mod.rs @@ -19,7 +19,7 @@ use crate::{ api::{Api, router::Terminating}, auctioneer::{AuctioneerHandle, RegWorkerHandle}, beacon::multi_beacon_client::MultiBeaconClient, - database::postgres::postgres_db_service::PostgresDatabaseService, + database::handle::DbHandle, gossip::GrpcGossiperClientManager, housekeeper::CurrentSlotInfo, }; @@ -27,7 +27,7 @@ use crate::{ #[derive(Clone)] pub struct ProposerApi { pub local_cache: Arc, - pub db: Arc, + pub db: DbHandle, pub gossiper: Arc, pub multi_beacon_client: Arc, pub api_provider: Arc, @@ -45,7 +45,7 @@ pub struct ProposerApi { impl ProposerApi { pub fn new( local_cache: Arc, - db: Arc, + db: DbHandle, gossiper: Arc, api_provider: Arc, signing_context: Arc, diff --git a/crates/relay/src/api/proposer/register.rs b/crates/relay/src/api/proposer/register.rs index 4e1cedb5..96557fcd 100644 --- a/crates/relay/src/api/proposer/register.rs +++ b/crates/relay/src/api/proposer/register.rs @@ -58,7 +58,7 @@ impl ProposerApi { let api_key = headers.get(HEADER_API_KEY).and_then(|key| key.to_str().ok()); let pool_name = match api_key { - Some(api_key) => match proposer_api.db.get_validator_pool_name(api_key).await? { + Some(api_key) => match proposer_api.local_cache.get_validator_pool_name(api_key) { Some(pool_name) => Some(pool_name), None => { warn!("Invalid api key provided"); @@ -119,7 +119,7 @@ impl ProposerApi { let mut skipped_registrations = 0; let registrations_to_check: Vec<_> = { - let known_validators_guard = proposer_api.db.known_validators_cache().read(); + let known_validators_guard = proposer_api.local_cache.known_validators_cache.read(); registrations .into_iter() .filter(|reg| { @@ -131,7 +131,7 @@ impl ProposerApi { } }) .filter(|reg| { - if proposer_api.db.is_registration_update_required(reg) { + if proposer_api.local_cache.is_registration_update_required(reg) { true } else { skipped_registrations += 1; @@ -208,7 +208,11 @@ impl ProposerApi { } }); - proposer_api.db.save_validator_registrations(registrations_to_save, pool_name, user_agent); + proposer_api.local_cache.save_validator_registrations( + registrations_to_save, + pool_name, + user_agent, + ); info!( ?process_time, diff --git a/crates/relay/src/api/proposer/tests.rs b/crates/relay/src/api/proposer/tests.rs index 2a2e0f2a..c5e2969f 100644 --- a/crates/relay/src/api/proposer/tests.rs +++ b/crates/relay/src/api/proposer/tests.rs @@ -1123,36 +1123,4 @@ mod proposer_api_tests { let _ = tx.send(()); } - - #[tokio::test] - async fn test_get_validator_preferences() { - let (tx, http_config, _api, _curr_slot_info, _auctioneer) = start_api_server().await; - - let validator_pubkey = get_fixed_pubkey(0); - - let req_url = format!( - "{}{}{}/{}", - http_config.base_url(), - PATH_PROPOSER_API, - PATH_GET_VALIDATOR_PREFERENCES, - validator_pubkey.as_hex_string() - ); - - let resp = reqwest::Client::new() - .get(&req_url) - .header("X-Api-Key", "valid-api-key") - .send() - .await - .unwrap(); - - assert_eq!(resp.status(), StatusCode::OK); - - let body = resp.text().await.unwrap(); - let preferences: ValidatorPreferences = serde_json::from_str(&body).unwrap(); - - assert!(preferences.header_delay); - assert!(preferences.trusted_builders.is_none()); - - let _ = tx.send(()); - } } diff --git a/crates/relay/src/api/service.rs b/crates/relay/src/api/service.rs index 9d9465a8..3096ebcc 100644 --- a/crates/relay/src/api/service.rs +++ b/crates/relay/src/api/service.rs @@ -12,7 +12,7 @@ use moka::sync::Cache; use tracing::{error, info}; use crate::{ - BidAdjustor, + PostgresDatabaseService, api::{ Api, builder::api::BuilderApi, @@ -20,31 +20,31 @@ use crate::{ relay_data::{BidsCache, DataApi, DeliveredPayloadsCache, SelectiveExpiry}, router::build_router, }, - auctioneer::{Event, spawn_workers}, + auctioneer::{AuctioneerHandle, RegWorkerHandle}, beacon::multi_beacon_client::MultiBeaconClient, - database::postgres::postgres_db_service::PostgresDatabaseService, + database::handle::DbHandle, gossip::{GrpcGossiperClientManager, process_gossip_messages}, housekeeper::CurrentSlotInfo, network::api::RelayNetworkApi, }; pub(crate) const API_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); -pub(crate) const SIMULATOR_REQUEST_TIMEOUT: Duration = Duration::from_secs(20); pub async fn start_api_service( mut config: RelayConfig, db: Arc, + db_handle: DbHandle, local_cache: Arc, current_slot_info: CurrentSlotInfo, chain_info: Arc, relay_signing_context: Arc, multi_beacon_client: Arc, api_provider: Arc, - bid_adjustor: impl BidAdjustor, known_validators_loaded: Arc, terminating: Arc, top_bid_tx: tokio::sync::broadcast::Sender, - event_channel: (crossbeam_channel::Sender, crossbeam_channel::Receiver), + auctioneer_handle: AuctioneerHandle, + registrations_handle: RegWorkerHandle, relay_network_api: RelayNetworkApi, ) { let gossiper = Arc::new( @@ -57,20 +57,9 @@ pub async fn start_api_service( let (gossip_sender, gossip_receiver) = tokio::sync::mpsc::channel(10_000); - // spawn auctioneer - let (auctioneer_handle, registrations_handle) = spawn_workers( - Arc::unwrap_or_clone(chain_info.clone()), - config.clone(), - db.clone(), - Arc::unwrap_or_clone(local_cache.clone()), - bid_adjustor, - top_bid_tx.clone(), - event_channel, - ); - let builder_api = BuilderApi::::new( local_cache.clone(), - db.clone(), + db_handle.clone(), config.clone(), current_slot_info.clone(), top_bid_tx, @@ -83,7 +72,7 @@ pub async fn start_api_service( let proposer_api = Arc::new(ProposerApi::::new( local_cache, - db.clone(), + db_handle.clone(), gossiper.clone(), api_provider.clone(), relay_signing_context, diff --git a/crates/relay/src/auctioneer/bid_adjustor.rs b/crates/relay/src/auctioneer/bid_adjustor.rs index 91bb65c5..791f1209 100644 --- a/crates/relay/src/auctioneer/bid_adjustor.rs +++ b/crates/relay/src/auctioneer/bid_adjustor.rs @@ -1,7 +1,12 @@ -use crate::auctioneer::PayloadEntry; +use crate::auctioneer::{PayloadEntry, SimulatorRequest, types::SlotData}; pub trait BidAdjustor: Send + Sync + 'static { - fn try_apply_adjustments(&self, bid: &PayloadEntry) -> Option; + fn try_apply_adjustments( + &self, + bid: &PayloadEntry, + slot_data: &SlotData, + is_dry_run: bool, + ) -> Option<(PayloadEntry, SimulatorRequest)>; fn on_new_slot(&mut self, bid_slot: u64); } @@ -9,7 +14,12 @@ pub trait BidAdjustor: Send + Sync + 'static { pub struct DefaultBidAdjustor; impl BidAdjustor for DefaultBidAdjustor { - fn try_apply_adjustments(&self, _bid: &PayloadEntry) -> Option { + fn try_apply_adjustments( + &self, + _bid: &PayloadEntry, + _slot_data: &SlotData, + _is_dry_run: bool, + ) -> Option<(PayloadEntry, SimulatorRequest)> { None } diff --git a/crates/relay/src/auctioneer/bid_sorter.rs b/crates/relay/src/auctioneer/bid_sorter.rs index a0d6631d..35cdb3b1 100644 --- a/crates/relay/src/auctioneer/bid_sorter.rs +++ b/crates/relay/src/auctioneer/bid_sorter.rs @@ -206,6 +206,10 @@ impl BidSorter { self.forks.get(parent_hash).and_then(|s| s.curr_bid.as_ref().map(|b| b.1.block_hash)) } + pub fn get_any_top_bid(&self) -> Option { + self.forks.iter().next().and_then(|(_, s)| s.curr_bid.as_ref().map(|b| b.1.block_hash)) + } + fn process_header( &mut self, new_pubkey: BlsPublicKeyBytes, diff --git a/crates/relay/src/auctioneer/block_merger.rs b/crates/relay/src/auctioneer/block_merger.rs index 090b7eca..00cc2ae7 100644 --- a/crates/relay/src/auctioneer/block_merger.rs +++ b/crates/relay/src/auctioneer/block_merger.rs @@ -8,8 +8,7 @@ use alloy_consensus::{Bytes48, Transaction, TxEip4844, TxEnvelope, TxType}; use alloy_primitives::{Address, B256, Bytes, U256}; use alloy_rlp::Decodable; use helix_common::{ - RelayConfig, chain_info::ChainInfo, local_cache::LocalCache, metrics::MERGE_TRACE_LATENCY, - utils::utcnow_ms, + RelayConfig, local_cache::LocalCache, metrics::MERGE_TRACE_LATENCY, utils::utcnow_ms, }; use helix_types::{ BlobWithMetadata, BlobWithMetadataV1, BlobWithMetadataV2, BlobsBundle, BlobsBundleVersion, @@ -64,7 +63,6 @@ pub enum OrderValidationError { pub struct BlockMerger { curr_bid_slot: u64, config: RelayConfig, - chain_info: ChainInfo, local_cache: LocalCache, best_merged_block: Option, best_mergeable_orders: BestMergeableOrders, @@ -86,16 +84,10 @@ pub struct BlockMerger { } impl BlockMerger { - pub fn new( - curr_bid_slot: u64, - chain_info: ChainInfo, - local_cache: LocalCache, - config: RelayConfig, - ) -> Self { + pub fn new(curr_bid_slot: u64, local_cache: LocalCache, config: RelayConfig) -> Self { Self { curr_bid_slot, config, - chain_info, local_cache, best_merged_block: None, best_mergeable_orders: BestMergeableOrders::new(), @@ -156,9 +148,7 @@ impl BlockMerger { return None; } - if entry.bid.payload_and_blobs.execution_payload.parent_hash != - original_bid.payload_and_blobs.execution_payload.parent_hash - { + if entry.bid.parent_hash() != original_bid.parent_hash() { trace!("merged bid parent hash does not match original bid parent hash"); return None; } @@ -281,13 +271,17 @@ impl BlockMerger { pub fn prepare_merged_payload_for_storage( &mut self, response: BlockMergeResponse, - original_payload: Arc, + original_payload: PayloadAndBlobs, builder_pubkey: BlsPublicKeyBytes, ) -> Result { debug!(?response.builder_inclusions, %response.proposer_value, "preparing merged payload for storage"); let start_time = Instant::now(); let bid_slot = self.curr_bid_slot; - let max_blobs_per_block = self.chain_info.max_blobs_per_block(); + let max_blobs_per_block = self + .local_cache + .get_chain_info() + .expect("chain info should be cached") + .max_blobs_per_block(); let base_block_data = match self.appendable_blocks.get(&response.base_block_hash) { Some(data) => data, @@ -329,7 +323,7 @@ impl BlockMerger { let blobs = &self.best_mergeable_orders.mergeable_blob_bundles; - let mut merged_blobs_bundle = original_payload.blobs_bundle.clone(); + let mut merged_blobs_bundle = original_payload.blobs_bundle.as_ref().to_owned(); append_merged_blobs( &mut merged_blobs_bundle, blobs, @@ -339,10 +333,10 @@ impl BlockMerger { let withdrawals_root = response.execution_payload.withdrawals_root(); - let payload_and_blobs = Arc::new(PayloadAndBlobs { - execution_payload: response.execution_payload, - blobs_bundle: merged_blobs_bundle, - }); + let payload_and_blobs = PayloadAndBlobs { + execution_payload: Arc::new(response.execution_payload), + blobs_bundle: Arc::new(merged_blobs_bundle), + }; let bid_data = PayloadBidData { withdrawals_root, @@ -354,12 +348,7 @@ impl BlockMerger { trace!(%block_hash, %response.proposer_value, "blobs appended to merged payload"); - let new_bid = PayloadEntry { - payload_and_blobs: payload_and_blobs.clone(), - bid_data: bid_data.clone(), - bid_adjustment_data: None, - submission_version: None, - }; + let new_bid = PayloadEntry::new_gossip(payload_and_blobs, bid_data); // Store locally to serve header requests self.best_merged_block = Some(BestMergedBlock { diff --git a/crates/relay/src/auctioneer/context.rs b/crates/relay/src/auctioneer/context.rs index f0164a65..0f434b6e 100644 --- a/crates/relay/src/auctioneer/context.rs +++ b/crates/relay/src/auctioneer/context.rs @@ -1,13 +1,12 @@ use std::{ ops::{Deref, DerefMut}, - sync::{Arc, atomic::Ordering}, - time::Instant, + sync::atomic::Ordering, + time::{Duration, Instant}, }; use alloy_primitives::{B256, U256}; use helix_common::{ - BuilderInfo, RelayConfig, chain_info::ChainInfo, local_cache::LocalCache, - metrics::SimulatorMetrics, spawn_tracked, + BuilderInfo, RelayConfig, local_cache::LocalCache, metrics::SimulatorMetrics, spawn_tracked, }; use helix_types::{BlsPublicKeyBytes, HydrationCache, Slot, SubmissionVersion}; use rustc_hash::FxHashMap; @@ -16,14 +15,14 @@ use tracing::{error, info, warn}; use crate::{ api::builder::error::BuilderApiError, auctioneer::{ - BlockMergeResponse, + BlockMergeResponse, Event, bid_adjustor::BidAdjustor, bid_sorter::BidSorter, block_merger::BlockMerger, simulator::manager::{SimulationResult, SimulatorManager}, types::{PayloadEntry, PendingPayload}, }, - database::postgres::postgres_db_service::PostgresDatabaseService, + database::handle::DbHandle, }; // Context that is only valid for a given slot @@ -41,11 +40,10 @@ pub struct SlotContext { } pub struct Context { - pub chain_info: ChainInfo, pub config: RelayConfig, pub cache: LocalCache, pub unknown_builder_info: BuilderInfo, - pub db: Arc, + pub db: DbHandle, pub slot_context: SlotContext, pub bid_adjustor: B, } @@ -53,15 +51,17 @@ pub struct Context { const EXPECTED_PAYLOADS_PER_SLOT: usize = 5000; const EXPECTED_BUILDERS_PER_SLOT: usize = 200; +const ADJUSTMENTS_DRY_RUN_INTERVAL: Duration = Duration::from_millis(500); + impl Context { pub fn new( - chain_info: ChainInfo, config: RelayConfig, sim_manager: SimulatorManager, - db: Arc, + db: DbHandle, bid_sorter: BidSorter, cache: LocalCache, bid_adjustor: B, + auctioneer: crossbeam_channel::Sender, ) -> Self { let unknown_builder_info = BuilderInfo { collateral: U256::ZERO, @@ -72,7 +72,7 @@ impl Context { api_key: None, }; - let block_merger = BlockMerger::new(0, chain_info.clone(), cache.clone(), config.clone()); + let block_merger = BlockMerger::new(0, cache.clone(), config.clone()); let slot_context = SlotContext { sim_manager, @@ -91,7 +91,9 @@ impl Context { block_merger, }; - Self { chain_info, cache, unknown_builder_info, slot_context, db, config, bid_adjustor } + Self::spawn_adjustments_dry_run_task(auctioneer, cache.clone()); + + Self { cache, unknown_builder_info, slot_context, db, config, bid_adjustor } } pub fn builder_info(&self, builder: &BlsPublicKeyBytes) -> BuilderInfo { @@ -117,7 +119,20 @@ impl Context { if let Err(err) = result.result.as_ref() && err.is_demotable() { - if self.cache.demote_builder(&builder) { + if self.payloads.get(&block_hash).is_some_and(|bid| bid.is_adjusted()) { + warn!(%builder, %block_hash, %err, "block simulation resulted in an error. Disabling adjustments..."); + + if !self.cache.adjustments_enabled.load(Ordering::Relaxed) { + warn!(%builder, %block_hash, %err, "adjustments already disabled"); + } else { + SimulatorMetrics::disable_adjustments(); + self.cache.adjustments_enabled.store(false, Ordering::Relaxed); + let db = self.db.clone(); + let failsafe_trigger = self.cache.adjustments_failsafe_trigger.clone(); + let adjustments_enabled = self.cache.adjustments_enabled.clone(); + db.disable_adjustments(block_hash, failsafe_trigger, adjustments_enabled); + } + } else if self.cache.demote_builder(&builder) { warn!(%builder, %block_hash, %err, "Block simulation resulted in an error. Demoting builder..."); SimulatorMetrics::demotion_count(); @@ -125,30 +140,19 @@ impl Context { let reason = err.to_string(); let bid_slot = result.submission.slot(); let failsafe_triggered = self.sim_manager.failsafe_triggered.clone(); - - let db = self.db.clone(); - spawn_tracked!(async move { - if let Err(err) = - db.db_demote_builder(bid_slot.as_u64(), &builder, &block_hash, reason).await - { - failsafe_triggered.store(true, Ordering::Relaxed); - error!(%builder, %err, %block_hash, "failed to demote builder in database! Pausing all optmistic submissions"); - } - }); + self.db.db_demote_builder( + bid_slot.as_u64(), + builder, + block_hash, + reason, + failsafe_triggered, + ); } else { warn!(%err, %builder, %block_hash, "builder already demoted, skipping demotion"); } }; - let db = self.db.clone(); - spawn_tracked!(async move { - if let Err(err) = db - .store_block_submission(result.submission, result.trace, result.optimistic_version) - .await - { - error!(%err, "failed to store block submission") - } - }); + self.db.store_block_submission(result.submission, result.trace, result.optimistic_version); if let Some(res_tx) = result.res_tx { // submission was initially valid but by the time sim finished the slot already @@ -196,8 +200,8 @@ impl Context { return; }; - let original_payload_and_blobs = original_payload.payload_and_blobs.clone(); - let builder_pubkey = original_payload.bid_data.builder_pubkey; + let original_payload_and_blobs = original_payload.payload_and_blobs(); + let builder_pubkey = *original_payload.bid_data_ref().builder_pubkey; //TODO: this function does a lot of work, should move that work away from the event loop let Some(payload) = self @@ -214,6 +218,26 @@ impl Context { }; self.payloads.insert(block_hash, payload); } + + fn spawn_adjustments_dry_run_task( + auctioneer: crossbeam_channel::Sender, + cache: LocalCache, + ) { + spawn_tracked!(async move { + let mut interval = tokio::time::interval(ADJUSTMENTS_DRY_RUN_INTERVAL); + loop { + interval.tick().await; + + if !cache.adjustments_enabled.load(Ordering::Relaxed) { + return; + } + + if let Err(e) = auctioneer.try_send(Event::DryRunAdjustments) { + error!("failed to send adjustments dry run request: {}", e); + } + } + }); + } } impl Deref for Context { diff --git a/crates/relay/src/auctioneer/get_header.rs b/crates/relay/src/auctioneer/get_header.rs index 398edccb..ce39aa53 100644 --- a/crates/relay/src/auctioneer/get_header.rs +++ b/crates/relay/src/auctioneer/get_header.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::Ordering; + use alloy_primitives::B256; use helix_common::api::proposer_api::GetHeaderParams; use tokio::sync::oneshot; @@ -5,20 +7,25 @@ use tracing::{error, warn}; use crate::{ api::proposer::ProposerApiError, - auctioneer::{bid_adjustor::BidAdjustor, context::Context, types::GetHeaderResult}, + auctioneer::{ + bid_adjustor::BidAdjustor, + context::Context, + types::{GetHeaderResult, SlotData}, + }, }; impl Context { pub(super) fn handle_get_header( &mut self, params: GetHeaderParams, + slot_data: &SlotData, res_tx: oneshot::Sender, ) { assert_eq!(params.slot, self.bid_slot.as_u64(), "params should already be validated!"); - let _ = res_tx.send(self.get_header(params.parent_hash)); + let _ = res_tx.send(self.get_header(params.parent_hash, slot_data)); } - fn get_header(&mut self, parent_hash: B256) -> GetHeaderResult { + fn get_header(&mut self, parent_hash: B256, slot_data: &SlotData) -> GetHeaderResult { let Some(best_block_hash) = self.bid_sorter.get_header(&parent_hash) else { warn!(%parent_hash, "no bids for this fork"); return Err(ProposerApiError::NoBidPrepared); @@ -33,10 +40,15 @@ impl Context { return Ok(merged_bid); }; - if let Some(adjusted_bid) = self.bid_adjustor.try_apply_adjustments(original_bid) { + if self.cache.adjustments_enabled.load(Ordering::Relaxed) && + let Some((adjusted_bid, sim_request)) = + self.bid_adjustor.try_apply_adjustments(original_bid, slot_data, false) + { let block_hash = adjusted_bid.block_hash(); self.payloads.insert(*block_hash, adjusted_bid.clone()); + self.sim_manager.handle_sim_request(sim_request, true); + return Ok(adjusted_bid); } diff --git a/crates/relay/src/auctioneer/get_payload.rs b/crates/relay/src/auctioneer/get_payload.rs index fa56eaa9..d7607e51 100644 --- a/crates/relay/src/auctioneer/get_payload.rs +++ b/crates/relay/src/auctioneer/get_payload.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use alloy_primitives::B256; use helix_common::GetPayloadTrace; use helix_types::{ @@ -38,7 +36,7 @@ impl Context { } let block_hash = payload.execution_payload.execution_payload.block_hash; - let entry = PayloadEntry::new_gossip(payload); + let entry = PayloadEntry::new_gossip(payload.execution_payload, payload.bid_data); self.payloads.entry(block_hash).or_insert(entry); } @@ -46,7 +44,7 @@ impl Context { #[must_use] pub(super) fn handle_get_payload( &mut self, - local: Arc, + local: PayloadAndBlobs, blinded: SignedBlindedBeaconBlock, trace: GetPayloadTrace, res_tx: oneshot::Sender, @@ -85,12 +83,12 @@ impl Context { info!("found payload for pending get_payload"); let PendingPayload { blinded, res_tx, trace, .. } = pending; self.handle_get_payload( - local.payload_and_blobs.clone(), + local.payload_and_blobs(), blinded, trace, res_tx, slot_data, - local.bid_data.clone(), + local.bid_data_ref().to_owned(), ) } else { self.pending_payload = Some(pending); @@ -101,7 +99,7 @@ impl Context { fn get_payload( &self, blinded: SignedBlindedBeaconBlock, - local: Arc, + local: PayloadAndBlobs, trace: GetPayloadTrace, slot_data: &SlotData, ) -> Result<(GetPayloadResponse, VersionedSignedProposal, GetPayloadTrace), ProposerApiError> @@ -152,7 +150,7 @@ impl Context { pub fn unblind( &self, blinded: SignedBlindedBeaconBlock, - local: Arc, + local: PayloadAndBlobs, slot_data: &SlotData, ) -> Result<(GetPayloadResponse, VersionedSignedProposal), ProposerApiError> { match blinded { diff --git a/crates/relay/src/auctioneer/mod.rs b/crates/relay/src/auctioneer/mod.rs index 8385aaad..1584564c 100644 --- a/crates/relay/src/auctioneer/mod.rs +++ b/crates/relay/src/auctioneer/mod.rs @@ -14,7 +14,6 @@ mod worker; use std::{ cmp::Ordering, - sync::Arc, time::{Duration, Instant}, }; @@ -26,7 +25,6 @@ use helix_common::{ api::builder_api::{ BuilderGetValidatorsResponseEntry, InclusionListWithMetadata, TopBidUpdate, }, - chain_info::ChainInfo, local_cache::LocalCache, metrics::{STATE_TRANSITION_COUNT, STATE_TRANSITION_LATENCY, WORKER_QUEUE_LEN, WORKER_UTIL}, record_submission_step, @@ -36,30 +34,27 @@ use helix_types::Slot; use rustc_hash::FxHashMap; pub use simulator::*; use tracing::{debug, error, info, info_span, trace, warn}; -pub use types::{Event, GetPayloadResultData, PayloadBidData, PayloadEntry}; +pub use types::{ + Event, GetPayloadResultData, PayloadBidData, PayloadEntry, SlotData, SubmissionPayload, +}; use worker::{RegWorker, SubWorker}; pub use crate::auctioneer::{ bid_adjustor::{BidAdjustor, DefaultBidAdjustor}, - simulator::client::SimulatorClient, + simulator::{SimulatorRequest, client::SimulatorClient}, }; use crate::{ - PostgresDatabaseService, api::{builder::error::BuilderApiError, proposer::ProposerApiError}, auctioneer::{ - bid_sorter::BidSorter, - context::Context, - manager::SimulatorManager, - types::{PendingPayload, SlotData}, + bid_sorter::BidSorter, context::Context, manager::SimulatorManager, types::PendingPayload, }, + database::handle::DbHandle, housekeeper::PayloadAttributesUpdate, }; -// TODO: tidy up builder and proposer api state, and spawn in a separate function pub fn spawn_workers( - chain_info: ChainInfo, config: RelayConfig, - db: Arc, + db: DbHandle, cache: LocalCache, bid_adjustor: B, top_bid_tx: tokio::sync::broadcast::Sender, @@ -71,7 +66,7 @@ pub fn spawn_workers( if config.is_registration_instance { for core in config.cores.reg_workers.clone() { - let worker = RegWorker::new(core, chain_info.clone()); + let worker = RegWorker::new(core, cache.clone()); let rx = reg_worker_rx.clone(); std::thread::Builder::new() @@ -86,13 +81,7 @@ pub fn spawn_workers( if config.is_submission_instance { for core in config.cores.sub_workers.clone() { - let worker = SubWorker::new( - core, - event_tx.clone(), - cache.clone(), - chain_info.clone(), - config.clone(), - ); + let worker = SubWorker::new(core, event_tx.clone(), cache.clone(), config.clone()); let rx = sub_worker_rx.clone(); std::thread::Builder::new() @@ -108,8 +97,15 @@ pub fn spawn_workers( let bid_sorter = BidSorter::new(top_bid_tx); let sim_manager = SimulatorManager::new(config.simulators.clone(), event_tx.clone()); - let ctx = - Context::new(chain_info, config, sim_manager, db, bid_sorter, cache, bid_adjustor); + let ctx = Context::new( + config, + sim_manager, + db, + bid_sorter, + cache, + bid_adjustor, + event_tx.clone(), + ); let id = format!("auctioneer_{auctioneer_core}"); let auctioneer = Auctioneer { ctx, state: State::default(), tel: Telemetry::new(id) }; @@ -338,6 +334,28 @@ impl State { ctx.handle_simulation_result((id, None)); } + // Dry run adjustments to validate bids throughout the slot + // to be able to disable them before get_header is called + (State::Sorting(slot_data), Event::DryRunAdjustments) => { + let Some(best_block_hash) = ctx.bid_sorter.get_any_top_bid() else { + warn!("adjustments dry run - no bids present yet"); + return; + }; + + let Some(original_bid) = ctx.payloads.get(&best_block_hash) else { + error!( + "adjustments dry run - failed to get payload from bid sorter, this should never happen!" + ); + return; + }; + + if let Some((_, sim_request)) = + ctx.bid_adjustor.try_apply_adjustments(original_bid, slot_data, true) + { + ctx.sim_manager.handle_sim_request(sim_request, true); + } + } + ///////////// VALID STATES / EVENTS ///////////// // submission @@ -380,7 +398,7 @@ impl State { warn!(req =% params.pubkey, this =% slot_data.registration_data.entry.registration.message.pubkey, "get header for mismatched proposer"); let _ = res_tx.send(Err(ProposerApiError::NoBidPrepared)); } else { - ctx.handle_get_header(params, res_tx) + ctx.handle_get_header(params, slot_data, res_tx) } trace!("finished processing"); @@ -397,12 +415,12 @@ impl State { if let Some(local) = ctx.payloads.get(&block_hash) { if let Some(block_hash) = ctx.handle_get_payload( - local.payload_and_blobs.clone(), + local.payload_and_blobs(), *blinded, trace, res_tx, slot_data, - local.bid_data.clone(), + local.bid_data_ref().to_owned(), ) { info!(bid_slot =% slot_data.bid_slot, %block_hash, "broadcasting block"); *self = State::Broadcasting { slot_data: slot_data.clone(), block_hash } @@ -550,6 +568,8 @@ impl State { warn!(curr =% bid_slot, gossip_slot = payload.slot, "received early or late gossip payload"); } } + + (State::Slot { .. } | State::Broadcasting { .. }, Event::DryRunAdjustments) => {} } } @@ -567,7 +587,8 @@ impl State { match (registration_data, payload_attributes_map.is_empty()) { (Some(registration_data), false) => { - let current_fork = ctx.chain_info.fork_at_slot(bid_slot); + let chain_info = ctx.cache.get_chain_info().expect("chain info should be cached"); + let current_fork = chain_info.fork_at_slot(bid_slot); info!(%bid_slot, attributes = payload_attributes_map.len(), "processed slot data, start sorting"); diff --git a/crates/relay/src/auctioneer/simulator/manager.rs b/crates/relay/src/auctioneer/simulator/manager.rs index d6a0147b..30e2f109 100644 --- a/crates/relay/src/auctioneer/simulator/manager.rs +++ b/crates/relay/src/auctioneer/simulator/manager.rs @@ -15,14 +15,13 @@ use helix_types::{BlsPublicKeyBytes, SignedBidSubmission, SubmissionVersion}; use tokio::sync::oneshot; use tracing::{debug, error, info, warn}; -use crate::{ - api::service::SIMULATOR_REQUEST_TIMEOUT, - auctioneer::{ - simulator::{BlockMergeRequest, SimulatorRequest, client::SimulatorClient}, - types::{Event, SubmissionResult}, - }, +use crate::auctioneer::{ + simulator::{BlockMergeRequest, SimulatorRequest, client::SimulatorClient}, + types::{Event, SubmissionResult}, }; +pub(crate) const SIMULATOR_REQUEST_TIMEOUT: Duration = Duration::from_secs(20); + #[derive(Default)] struct LocalTelemetry { sims_reqs: usize, @@ -58,6 +57,7 @@ pub struct SimulationResultInner { pub struct SimulatorManager { simulators: Vec, requests: PendingRquests, + priority_requests: PendingRquests, last_bid_slot: u64, local_telemetry: LocalTelemetry, sim_result_tx: crossbeam_channel::Sender, @@ -81,6 +81,7 @@ impl SimulatorManager { .collect(); let requests = PendingRquests::with_capacity(200); + let priority_requests = PendingRquests::with_capacity(30); if !is_local_dev() { spawn_tracked!({ @@ -105,6 +106,7 @@ impl SimulatorManager { Self { simulators, requests, + priority_requests, last_bid_slot: 0, local_telemetry: LocalTelemetry::default(), @@ -129,13 +131,15 @@ impl SimulatorManager { self.accept_optimistic = new; } - pub fn handle_sim_request(&mut self, req: SimulatorRequest) { + pub fn handle_sim_request(&mut self, req: SimulatorRequest, fast_track: bool) { assert_eq!(req.bid_slot(), self.last_bid_slot); self.local_telemetry.sims_reqs += 1; if let Some(id) = self.next_sim_client() { self.local_telemetry.sims_sent_immediately += 1; self.spawn_sim(id, req) + } else if fast_track { + self.priority_requests.store(req, &mut self.local_telemetry) } else { self.requests.store(req, &mut self.local_telemetry) } @@ -174,7 +178,7 @@ impl SimulatorManager { sim.paused_until = sim.paused_until.max(paused_until); // keep highest pause if let Some(id) = self.next_sim_client() && - let Some(req) = self.requests.next_req() + let Some(req) = self.priority_requests.next_req().or(self.requests.next_req()) { self.spawn_sim(id, req); } @@ -264,6 +268,7 @@ impl SimulatorManager { self.last_bid_slot = bid_slot; self.requests.clear(bid_slot); + self.priority_requests.clear(bid_slot); let now = Instant::now(); for s in self.simulators.iter_mut() { if s.paused_until.is_some_and(|until| until < now) { diff --git a/crates/relay/src/auctioneer/simulator/mod.rs b/crates/relay/src/auctioneer/simulator/mod.rs index 790f9752..1adfbd8d 100644 --- a/crates/relay/src/auctioneer/simulator/mod.rs +++ b/crates/relay/src/auctioneer/simulator/mod.rs @@ -12,7 +12,7 @@ use helix_types::{ }; use tokio::sync::oneshot; -use crate::auctioneer::types::SubmissionResult; +use crate::{SlotData, SubmissionPayload, auctioneer::types::SubmissionResult}; pub mod client; pub mod manager; @@ -49,7 +49,7 @@ impl BlockSimRequest { apply_blacklist: proposer_preferences.filtering.is_regional(), proposer_preferences, blobs_bundle: Some(block.blobs_bundle().clone()), - execution_requests: Some(block.execution_requests()), + execution_requests: Some(block.execution_requests_ref().clone()), parent_beacon_block_root, inclusion_list, } @@ -119,6 +119,26 @@ pub struct SimulatorRequest { } impl SimulatorRequest { + pub fn new(bid: &SubmissionPayload, slot_data: &SlotData) -> Self { + let request = BlockSimRequest::new( + slot_data.registration_data.entry.registration.message.gas_limit, + &bid.signed_bid_submission, + slot_data.registration_data.entry.preferences.clone(), + bid.parent_beacon_block_root, + slot_data.il.clone(), + ); + + Self { + request, + is_top_bid: true, + res_tx: None, + submission: bid.signed_bid_submission.clone(), + trace: bid.submission_trace.clone(), + tx_root: bid.tx_root, + version: bid.submission_version, + } + } + pub fn on_receive_ns(&self) -> u64 { self.trace.receive } diff --git a/crates/relay/src/auctioneer/submit_block.rs b/crates/relay/src/auctioneer/submit_block.rs index d8c6c258..7bb17548 100644 --- a/crates/relay/src/auctioneer/submit_block.rs +++ b/crates/relay/src/auctioneer/submit_block.rs @@ -116,7 +116,8 @@ impl Context { Submission::Dehydrated(dehydrated) => { trace!("hydrating submission"); let start = Instant::now(); - let max_blobs_per_block = self.chain_info.max_blobs_per_block(); + let chain_info = self.cache.get_chain_info().expect("chain info should be cached"); + let max_blobs_per_block = chain_info.max_blobs_per_block(); let hydrated = dehydrated.hydrate(&mut self.hydration_cache, max_blobs_per_block)?; @@ -176,7 +177,7 @@ impl Context { is_top_bid, slot: submission.slot().as_u64(), block_hash: *submission.block_hash(), - block_value: submission.value(), + block_value: *submission.value(), proposer_fee_recipient: *submission.proposer_fee_recipient(), parent_beacon_block_root: payload_attributes.parent_beacon_block_root, execution_payload: submission.execution_payload_ref().clone(), @@ -202,14 +203,12 @@ impl Context { res_tx: Option>, slot_data: &SlotData, ) { - let inclusion_list = slot_data.il.clone(); - let request = BlockSimRequest::new( slot_data.registration_data.entry.registration.message.gas_limit, &validated.submission, slot_data.registration_data.entry.preferences.clone(), validated.payload_attributes.parent_beacon_block_root, - inclusion_list, + slot_data.il.clone(), ); let req = SimulatorRequest { @@ -222,7 +221,7 @@ impl Context { version: validated.version, }; - self.sim_manager.handle_sim_request(req); + self.sim_manager.handle_sim_request(req, false); let block_hash = *validated.submission.block_hash(); let entry = PayloadEntry::new_submission( @@ -231,6 +230,8 @@ impl Context { validated.tx_root, validated.bid_adjustment_data, validated.version, + validated.trace, + validated.payload_attributes.parent_beacon_block_root, ); self.payloads.insert(block_hash, entry); } diff --git a/crates/relay/src/auctioneer/types.rs b/crates/relay/src/auctioneer/types.rs index e2e4dc27..f2accde3 100644 --- a/crates/relay/src/auctioneer/types.rs +++ b/crates/relay/src/auctioneer/types.rs @@ -20,6 +20,7 @@ use helix_types::{ SubmissionVersion, VersionedSignedProposal, mock_public_key_bytes, }; use rustc_hash::FxHashMap; +use serde::Serialize; use ssz_derive::{Decode, Encode}; use tokio::sync::oneshot; use tracing::debug; @@ -106,15 +107,29 @@ impl Submission { } } -/// From a SignedBidSubmission, keep only the fields needed to serve get_header and get_payload, -/// some fields are optional because payloads also arrive via gossip and we only gossip -/// PayloadAndBlobs #[derive(Clone)] -pub struct PayloadEntry { - pub payload_and_blobs: Arc, +pub struct GossipPayload { + pub payload_and_blobs: PayloadAndBlobs, pub bid_data: PayloadBidData, +} + +#[derive(Clone)] +pub struct SubmissionPayload { + pub signed_bid_submission: SignedBidSubmission, + pub withdrawals_root: B256, + pub tx_root: Option, pub bid_adjustment_data: Option, - pub submission_version: Option, + pub is_adjusted: bool, + pub submission_version: SubmissionVersion, + pub submission_trace: SubmissionTrace, + pub parent_beacon_block_root: Option, +} + +#[allow(clippy::large_enum_variant)] +#[derive(Clone)] +pub enum PayloadEntry { + Submission(SubmissionPayload), + Gossip(GossipPayload), } impl PayloadEntry { @@ -124,40 +139,75 @@ impl PayloadEntry { tx_root: Option, bid_adjustment_data: Option, submission_version: SubmissionVersion, + submission_trace: SubmissionTrace, + parent_beacon_block_root: Option, ) -> Self { - Self { - payload_and_blobs: signed_bid_submission.payload_and_blobs_ref().to_owned().into(), - bid_data: PayloadBidData { - withdrawals_root, - tx_root, - execution_requests: signed_bid_submission.execution_requests().clone(), - value: signed_bid_submission.value(), - builder_pubkey: *signed_bid_submission.builder_public_key(), - }, + Self::Submission(SubmissionPayload { + signed_bid_submission, + withdrawals_root, + tx_root, bid_adjustment_data, - submission_version: Some(submission_version), + is_adjusted: false, + submission_version, + submission_trace, + parent_beacon_block_root, + }) + } + + pub fn new_gossip(payload_and_blobs: PayloadAndBlobs, bid_data: PayloadBidData) -> Self { + Self::Gossip(GossipPayload { payload_and_blobs, bid_data }) + } + + pub fn is_adjusted(&self) -> bool { + matches!(self, Self::Submission(s) if s.is_adjusted) + } + + pub fn value(&self) -> &U256 { + match &self { + Self::Submission(s) => s.signed_bid_submission.value(), + Self::Gossip(s) => &s.bid_data.value, } } - pub fn new_gossip(data: BroadcastPayloadParams) -> Self { - Self { - payload_and_blobs: data.execution_payload, - bid_data: data.bid_data, - bid_adjustment_data: None, - submission_version: None, + pub fn payload_and_blobs(&self) -> PayloadAndBlobs { + match &self { + Self::Submission(s) => s.signed_bid_submission.payload_and_blobs(), + Self::Gossip(s) => s.payload_and_blobs.clone(), } } - pub fn value(&self) -> &U256 { - &self.bid_data.value + pub fn parent_hash(&self) -> &B256 { + match &self { + Self::Submission(s) => &s.signed_bid_submission.execution_payload_ref().parent_hash, + Self::Gossip(s) => &s.payload_and_blobs.execution_payload.parent_hash, + } + } + + pub fn bid_data_ref(&self) -> PayloadBidDataRef<'_> { + match &self { + Self::Submission(s) => PayloadBidDataRef { + withdrawals_root: &s.withdrawals_root, + tx_root: &s.tx_root, + execution_requests: s.signed_bid_submission.execution_requests_ref(), + value: s.signed_bid_submission.value(), + builder_pubkey: s.signed_bid_submission.builder_public_key(), + }, + Self::Gossip(s) => PayloadBidDataRef::from(&s.bid_data), + } } pub fn execution_payload(&self) -> &ExecutionPayload { - &self.payload_and_blobs.execution_payload + match self { + Self::Submission(bid) => bid.signed_bid_submission.execution_payload_ref(), + Self::Gossip(bid) => &bid.payload_and_blobs.execution_payload, + } } pub fn execution_payload_make_mut(&mut self) -> &mut ExecutionPayload { - &mut Arc::make_mut(&mut self.payload_and_blobs).execution_payload + match self { + Self::Submission(bid) => bid.signed_bid_submission.execution_payload_make_mut(), + Self::Gossip(bid) => Arc::make_mut(&mut bid.payload_and_blobs.execution_payload), + } } pub fn block_hash(&self) -> &B256 { @@ -168,16 +218,28 @@ impl PayloadEntry { pub fn into_builder_bid_slow(self) -> BuilderBid { let start = Instant::now(); - let header = self - .payload_and_blobs - .execution_payload - .to_header(Some(self.bid_data.withdrawals_root), self.bid_data.tx_root); + let (withdrawals_root, tx_root, execution_requests, commitments) = match &self { + Self::Submission(s) => ( + Some(s.withdrawals_root), + s.tx_root, + s.signed_bid_submission.execution_requests_ref().clone(), + s.signed_bid_submission.blobs_bundle().commitments().to_owned(), + ), + Self::Gossip(s) => ( + None, + None, + s.bid_data.execution_requests.clone(), + s.payload_and_blobs.blobs_bundle.commitments().to_owned(), + ), + }; + + let header = self.execution_payload().to_header(withdrawals_root, tx_root); let bid = BuilderBid { header, - blob_kzg_commitments: self.payload_and_blobs.blobs_bundle.commitments().clone(), - value: self.bid_data.value, - execution_requests: self.bid_data.execution_requests, + blob_kzg_commitments: commitments, + value: *self.value(), + execution_requests, pubkey: mock_public_key_bytes(), }; @@ -197,6 +259,38 @@ pub struct PayloadBidData { pub builder_pubkey: BlsPublicKeyBytes, } +#[derive(Clone, PartialEq, Debug, Encode, Serialize)] +pub struct PayloadBidDataRef<'a> { + pub withdrawals_root: &'a B256, + pub tx_root: &'a Option, + pub execution_requests: &'a Arc, + pub value: &'a U256, + pub builder_pubkey: &'a BlsPublicKeyBytes, +} + +impl<'a> From<&'a PayloadBidData> for PayloadBidDataRef<'a> { + fn from(bid_data: &'a PayloadBidData) -> Self { + Self { + withdrawals_root: &bid_data.withdrawals_root, + tx_root: &bid_data.tx_root, + execution_requests: &bid_data.execution_requests, + value: &bid_data.value, + builder_pubkey: &bid_data.builder_pubkey, + } + } +} + +impl PayloadBidDataRef<'_> { + pub fn to_owned(&self) -> PayloadBidData { + let withdrawals_root = *self.withdrawals_root; + let tx_root = *self.tx_root; + let execution_requests = self.execution_requests.clone(); + let value = *self.value; + let builder_pubkey = *self.builder_pubkey; + PayloadBidData { withdrawals_root, tx_root, execution_requests, value, builder_pubkey } + } +} + pub enum SubWorkerJob { BlockSubmission { headers: http::HeaderMap, @@ -295,6 +389,7 @@ pub enum Event { is_synced: bool, }, MergeResult(BlockMergeResult), + DryRunAdjustments, } impl Event { @@ -308,6 +403,7 @@ impl Event { Event::SimResult(_) => "SimResult", Event::SimulatorSync { .. } => "SimulatorSync", Event::MergeResult(_) => "MergeResult", + Event::DryRunAdjustments => "DryRunAdjustments", } } } diff --git a/crates/relay/src/auctioneer/worker.rs b/crates/relay/src/auctioneer/worker.rs index 91719464..6347fc0d 100644 --- a/crates/relay/src/auctioneer/worker.rs +++ b/crates/relay/src/auctioneer/worker.rs @@ -91,7 +91,6 @@ pub(super) struct SubWorker { id: String, tx: crossbeam_channel::Sender, cache: LocalCache, - chain_info: ChainInfo, tel: Telemetry, config: RelayConfig, } @@ -101,17 +100,9 @@ impl SubWorker { id: usize, tx: crossbeam_channel::Sender, cache: LocalCache, - chain_info: ChainInfo, config: RelayConfig, ) -> Self { - Self { - id: format!("submission_{id}"), - tx, - cache, - chain_info, - tel: Telemetry::default(), - config, - } + Self { id: format!("submission_{id}"), tx, cache, tel: Telemetry::default(), config } } pub(super) fn run(mut self, rx: crossbeam_channel::Receiver) { @@ -293,23 +284,25 @@ impl SubWorker { block_merging_dry_run: self.config.block_merging_config.is_dry_run, }; + let chain_info = self.cache.get_chain_info().expect("chain info should be cached"); + let (submission, merging_data, bid_adjustment_data) = match submission_type { Some(SubmissionType::Default) => { - decode_default(&mut decoder, body, trace, &self.chain_info, &flags)? + decode_default(&mut decoder, body, trace, &chain_info, &flags)? } Some(SubmissionType::Merge) => { - decode_merge(&mut decoder, body, trace, &self.chain_info, &flags)? + decode_merge(&mut decoder, body, trace, &chain_info, &flags)? } Some(SubmissionType::Dehydrated) => { - decode_dehydrated(&mut decoder, body, trace, &self.chain_info, &flags)? + decode_dehydrated(&mut decoder, body, trace, &chain_info, &flags)? } None => { if should_hydrate { - decode_dehydrated(&mut decoder, body, trace, &self.chain_info, &flags)? + decode_dehydrated(&mut decoder, body, trace, &chain_info, &flags)? } else if has_mergeable_data { - decode_merge(&mut decoder, body, trace, &self.chain_info, &flags)? + decode_merge(&mut decoder, body, trace, &chain_info, &flags)? } else { - decode_default(&mut decoder, body, trace, &self.chain_info, &flags)? + decode_default(&mut decoder, body, trace, &chain_info, &flags)? } } }; @@ -329,7 +322,8 @@ impl SubWorker { _trace: &mut GetPayloadTrace, ) -> Result<(SignedBlindedBeaconBlock, B256), ProposerApiError> { trace!("verifying signature"); - verify_signed_blinded_block_signature(&self.chain_info, &blinded_block, proposer_pubkey)?; + let chain_info = self.cache.get_chain_info().expect("chain info should be cached"); + verify_signed_blinded_block_signature(&chain_info, &blinded_block, proposer_pubkey)?; trace!("signature verified"); let block_hash = blinded_block @@ -347,13 +341,13 @@ impl SubWorker { /// Worker to process registrations verifications pub(super) struct RegWorker { id: String, - chain_info: ChainInfo, + local_cache: LocalCache, tel: Telemetry, } impl RegWorker { - pub(super) fn new(id: usize, chain_info: ChainInfo) -> Self { - Self { id: format!("registration_{id}"), chain_info, tel: Default::default() } + pub(super) fn new(id: usize, local_cache: LocalCache) -> Self { + Self { id: format!("registration_{id}"), local_cache, tel: Default::default() } } pub(super) fn run(mut self, rx: crossbeam_channel::Receiver) { @@ -396,7 +390,9 @@ impl RegWorker { } let start = Instant::now(); - let valid = validate_registration(&self.chain_info, ®s[i]); + let chain_info = + self.local_cache.get_chain_info().expect("chain info should be cached"); + let valid = validate_registration(&chain_info, ®s[i]); res.push((i, valid.is_ok())); WORKER_TASK_COUNT.with_label_values(&["Registration", &self.id]).inc(); diff --git a/crates/relay/src/database/handle.rs b/crates/relay/src/database/handle.rs new file mode 100644 index 00000000..2e747893 --- /dev/null +++ b/crates/relay/src/database/handle.rs @@ -0,0 +1,206 @@ +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; + +use alloy_primitives::{B256, U256}; +use helix_common::{ + DataAdjustmentsEntry, GetHeaderTrace, GetPayloadTrace, GossipedPayloadTrace, SubmissionTrace, + ValidatorSummary, + api::{ + builder_api::{BuilderGetValidatorsResponseEntry, InclusionListWithMetadata}, + proposer_api::GetHeaderParams, + }, + bid_submission::OptimisticVersion, + utils::alert_discord, +}; +use helix_types::{BlsPublicKeyBytes, SignedBidSubmission}; +use tracing::error; + +use crate::database::{ + BuilderInfoDocument, SavePayloadParams, + postgres::postgres_db_service::{DbRequest, PendingBlockSubmissionValue}, +}; + +#[derive(Clone)] +pub struct DbHandle { + sender: crossbeam_channel::Sender, + batch_sender: crossbeam_channel::Sender, +} + +impl DbHandle { + pub fn new( + sender: crossbeam_channel::Sender, + batch_sender: crossbeam_channel::Sender, + ) -> Self { + Self { sender, batch_sender } + } + + pub fn save_too_late_get_payload( + &self, + slot: u64, + proposer_pub_key: BlsPublicKeyBytes, + payload_hash: B256, + message_received: u64, + payload_fetched: u64, + ) { + if let Err(err) = self.sender.try_send(DbRequest::SaveTooLateGetPayload { + slot, + proposer_pub_key, + payload_hash, + message_received, + payload_fetched, + }) { + error!(%err, "failed to send SaveTooLateGetPayload request"); + } + } + + pub fn save_delivered_payload(&self, params: SavePayloadParams) { + if let Err(err) = self.sender.try_send(DbRequest::SaveDeliveredPayload { params }) { + error!(%err, "failed to send SaveDeliveredPayload request"); + } + } + + pub fn store_builders_info(&self, builders: Vec) { + if let Err(err) = self.sender.send(DbRequest::StoreBuildersInfo { builders }) { + error!(%err, "failed to send StoreBuildersInfo request"); + } + } + + pub fn db_demote_builder( + &self, + slot: u64, + builder_pub_key: BlsPublicKeyBytes, + block_hash: B256, + reason: String, + failsafe_triggered: Arc, + ) { + if let Err(err) = self.sender.send(DbRequest::DbDemoteBuilder { + slot, + builder_pub_key, + block_hash, + reason, + failsafe_triggered: failsafe_triggered.clone(), + }) { + error!(%err, "failed to send DbDemoteBuilder request triggering failsafe: stopping all optimistic submissions"); + failsafe_triggered.store(true, Ordering::Relaxed); + alert_discord(&format!( + "{} {} {} failed to demote builder in database! Pausing all optmistic submissions", + builder_pub_key, err, block_hash + )); + } + } + + pub fn store_block_submission( + &self, + submission: SignedBidSubmission, + trace: SubmissionTrace, + optimistic_version: OptimisticVersion, + ) { + if let Err(err) = self.batch_sender.send(PendingBlockSubmissionValue { + submission, + trace, + optimistic_version, + }) { + error!(%err, "failed to store block submission"); + } + } + + pub fn save_get_header_call( + &self, + params: GetHeaderParams, + best_block_hash: B256, + value: U256, + trace: GetHeaderTrace, + mev_boost: bool, + user_agent: Option, + ) { + if let Err(err) = self.sender.send(DbRequest::SaveGetHeaderCall { + params, + best_block_hash, + value, + trace, + mev_boost, + user_agent, + }) { + error!(%err, "failed to send SaveGetHeaderCall request"); + } + } + + pub fn save_failed_get_payload( + &self, + slot: u64, + block_hash: B256, + error: String, + trace: GetPayloadTrace, + ) { + if let Err(err) = + self.sender.send(DbRequest::SaveFailedGetPayload { slot, block_hash, error, trace }) + { + error!(%err, "failed to send SaveFailedGetPayload request"); + } + } + + pub fn save_gossiped_payload_trace(&self, block_hash: B256, trace: GossipedPayloadTrace) { + if let Err(err) = + self.sender.send(DbRequest::SaveGossipedPayloadTrace { block_hash, trace }) + { + error!(%err, "failed to send SaveGossipedPayloadTrace request"); + } + } + + pub fn save_inclusion_list( + &self, + inclusion_list: InclusionListWithMetadata, + slot: u64, + block_parent_hash: B256, + proposer_pubkey: BlsPublicKeyBytes, + ) { + if let Err(err) = self.sender.send(DbRequest::SaveInclusionList { + inclusion_list, + slot, + block_parent_hash, + proposer_pubkey, + }) { + error!(%err, "failed to send SaveInclusionList request"); + } + } + + pub fn save_block_adjustments_data(&self, entry: DataAdjustmentsEntry) { + if let Err(err) = self.sender.send(DbRequest::SaveBlockAdjustmentsData { entry }) { + error!(%err, "failed to send SaveBlockAdjustmentsData request"); + } + } + + pub fn set_known_validators(&self, known_validators: Vec) { + if let Err(err) = self.sender.try_send(DbRequest::SetKnownValidators { known_validators }) { + error!(%err, "failed to send SetKnownValidators request"); + } + } + + pub fn set_proposer_duties(&self, duties: Vec) { + if let Err(err) = self.sender.try_send(DbRequest::SetProposerDuties { duties }) { + error!(%err, "failed to send SetProposerDuties request"); + } + } + + pub fn disable_adjustments( + &self, + block_hash: B256, + failsafe_trigger: Arc, + adjustments_enabled: Arc, + ) { + if let Err(err) = self.sender.try_send(DbRequest::DisableAdjustments { + block_hash, + failsafe_trigger: failsafe_trigger.clone(), + adjustments_enabled: adjustments_enabled.clone(), + }) { + error!(%err, "failed to send DisableAdjustments request triggering failsafe: stopping all adjustments"); + failsafe_trigger.store(true, Ordering::Relaxed); + alert_discord(&format!( + "{} {} failed to disable adjustments in database! Pausing all adjustments", + block_hash, err + )); + } + } +} diff --git a/crates/relay/src/database/mod.rs b/crates/relay/src/database/mod.rs index dbbbd73b..91915a8a 100644 --- a/crates/relay/src/database/mod.rs +++ b/crates/relay/src/database/mod.rs @@ -1,4 +1,5 @@ pub mod error; +pub mod handle; pub mod postgres; pub mod types; @@ -10,15 +11,21 @@ use std::{ time::Duration, }; -use helix_common::{RelayConfig, is_local_dev}; +use helix_common::{RelayConfig, is_local_dev, local_cache}; use postgres::postgres_db_service::PostgresDatabaseService; pub use types::*; +pub use crate::database::postgres::postgres_db_service::{DbRequest, PendingBlockSubmissionValue}; + pub async fn start_db_service( config: &RelayConfig, known_validators_loaded: Arc, + db_request_receiver: crossbeam_channel::Receiver, + db_batch_request_receiver: crossbeam_channel::Receiver, + local_cache: Arc, ) -> eyre::Result> { - let mut postgres_db = PostgresDatabaseService::from_relay_config(config).await; + let mut postgres_db = + PostgresDatabaseService::from_relay_config(config, local_cache.clone()).await; if !is_local_dev() { postgres_db.init_forever().await; @@ -34,8 +41,13 @@ pub async fn start_db_service( tokio::spawn({ let known_validators_loaded = known_validators_loaded.clone(); let postgres_db = postgres_db.clone(); + let local_cache = local_cache.clone(); async move { postgres_db.load_known_validators().await; + postgres_db.load_validator_registrations().await; + postgres_db.load_builder_infos(local_cache.clone()).await; + postgres_db.load_trusted_proposers(local_cache.clone()).await; + postgres_db.load_validator_pools().await; known_validators_loaded.store(true, Ordering::Relaxed); if should_refresh_cache { @@ -46,14 +58,17 @@ pub async fn start_db_service( loop { tick.tick().await; postgres_db.load_known_validators().await; + postgres_db.load_validator_registrations().await; + postgres_db.load_builder_infos(local_cache.clone()).await; + postgres_db.load_trusted_proposers(local_cache.clone()).await; + postgres_db.load_validator_pools().await; } } } }); } - //postgres_db.load_validator_registrations().await; - postgres_db.start_processors().await; + postgres_db.start_processors(db_request_receiver, db_batch_request_receiver).await; Ok(Arc::new(postgres_db)) } diff --git a/crates/relay/src/database/postgres/migrations/V39__relay_info.sql b/crates/relay/src/database/postgres/migrations/V39__relay_info.sql new file mode 100644 index 00000000..8593f26e --- /dev/null +++ b/crates/relay/src/database/postgres/migrations/V39__relay_info.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS relay_info ( + "version" integer PRIMARY KEY, + "adjustments_enabled" boolean NOT NULL DEFAULT true +); + +INSERT INTO relay_info ("version") +VALUES (1) +ON CONFLICT ("version") DO NOTHING; + +ALTER TABLE "bid_adjustments" + ADD COLUMN IF NOT EXISTS is_dry_run boolean; diff --git a/crates/relay/src/database/postgres/postgres_db_service.rs b/crates/relay/src/database/postgres/postgres_db_service.rs index 88676a46..e40d1b8b 100644 --- a/crates/relay/src/database/postgres/postgres_db_service.rs +++ b/crates/relay/src/database/postgres/postgres_db_service.rs @@ -2,34 +2,33 @@ use std::{ ops::DerefMut, sync::{ Arc, - atomic::{AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, }, time::{Duration, SystemTime}, }; use alloy_primitives::{B256, U256}; -use dashmap::{DashMap, DashSet}; use deadpool_postgres::{Config, GenericClient, ManagerConfig, Pool, RecyclingMethod}; use helix_common::{ - BuilderInfo, DataAdjustmentsEntry, Filtering, GetHeaderTrace, GetPayloadTrace, - GossipedPayloadTrace, PostgresConfig, ProposerInfo, RelayConfig, - SignedValidatorRegistrationEntry, SubmissionTrace, ValidatorPreferences, ValidatorSummary, + DataAdjustmentsEntry, Filtering, GetHeaderTrace, GetPayloadTrace, GossipedPayloadTrace, + PostgresConfig, ProposerInfo, RelayConfig, SignedValidatorRegistrationEntry, SubmissionTrace, + ValidatorPreferences, ValidatorSummary, api::{ builder_api::{BuilderGetValidatorsResponseEntry, InclusionListWithMetadata}, data_api::{ BidFilters, DataAdjustmentsResponse, ProposerHeaderDeliveredParams, ProposerHeaderDeliveredResponse, }, - proposer_api::{GetHeaderParams, ValidatorRegistrationInfo}, + proposer_api::GetHeaderParams, }, bid_submission::OptimisticVersion, + local_cache::LocalCache, metrics::DbMetricRecord, - utils::utcnow_ms, + utils::{alert_discord, utcnow_ms}, }; -use helix_types::{BlsPublicKeyBytes, SignedBidSubmission, SignedValidatorRegistration, Slot}; -use parking_lot::RwLock; +use helix_types::{BlsPublicKeyBytes, SignedBidSubmission, Slot}; use rustc_hash::FxHashSet; -use tokio::sync::mpsc::Sender; +use tokio::sync::oneshot; use tokio_postgres::{NoTls, types::ToSql}; use tracing::{error, info, instrument, warn}; @@ -46,7 +45,97 @@ use crate::database::{ }, }; -struct PendingBlockSubmissionValue { +pub enum DbRequest { + GetValidatorRegistration { + pub_key: BlsPublicKeyBytes, + response: oneshot::Sender>, + }, + GetValidatorRegistrationsForPubKeys { + pub_keys: Vec, + response: oneshot::Sender, DatabaseError>>, + }, + SaveTooLateGetPayload { + slot: u64, + proposer_pub_key: BlsPublicKeyBytes, + payload_hash: B256, + message_received: u64, + payload_fetched: u64, + }, + SaveDeliveredPayload { + params: SavePayloadParams, + }, + StoreBuildersInfo { + builders: Vec, + }, + DbDemoteBuilder { + slot: u64, + builder_pub_key: BlsPublicKeyBytes, + block_hash: B256, + reason: String, + failsafe_triggered: Arc, + }, + GetBids { + filters: BidFilters, + validator_preferences: Arc, + response: oneshot::Sender, DatabaseError>>, + }, + GetDeliveredPayloads { + filters: BidFilters, + validator_preferences: Arc, + response: oneshot::Sender, DatabaseError>>, + }, + SaveGetHeaderCall { + params: GetHeaderParams, + best_block_hash: B256, + value: U256, + trace: GetHeaderTrace, + mev_boost: bool, + user_agent: Option, + }, + SaveFailedGetPayload { + slot: u64, + block_hash: B256, + error: String, + trace: GetPayloadTrace, + }, + SaveGossipedPayloadTrace { + block_hash: B256, + trace: GossipedPayloadTrace, + }, + GetTrustedProposers { + response: oneshot::Sender, DatabaseError>>, + }, + SaveInclusionList { + inclusion_list: InclusionListWithMetadata, + slot: u64, + block_parent_hash: B256, + proposer_pubkey: BlsPublicKeyBytes, + }, + GetBlockAdjustmentsForSlot { + slot: Slot, + response: oneshot::Sender, DatabaseError>>, + }, + SaveBlockAdjustmentsData { + entry: DataAdjustmentsEntry, + }, + GetProposerHeaderDelivered { + params: ProposerHeaderDeliveredParams, + response: oneshot::Sender, DatabaseError>>, + }, + SetKnownValidators { + known_validators: Vec, + }, + SetProposerDuties { + duties: Vec, + }, + DisableAdjustments { + block_hash: B256, + failsafe_trigger: Arc, + adjustments_enabled: Arc, + }, +} + +pub struct PendingBlockSubmissionValue { pub submission: SignedBidSubmission, pub trace: SubmissionTrace, pub optimistic_version: OptimisticVersion, @@ -54,6 +143,7 @@ struct PendingBlockSubmissionValue { const BLOCK_SUBMISSION_FIELD_COUNT: usize = 16; const MAINNET_VALIDATOR_COUNT: usize = 1_100_000; +const DB_CHECK_INTERVAL: Duration = Duration::from_secs(1); static DELIVERED_PAYLOADS_MIG_SLOT: AtomicU64 = AtomicU64::new(0); fn new_validator_set() -> FxHashSet { @@ -85,18 +175,17 @@ struct TrustedProposerParams { #[derive(Clone)] pub struct PostgresDatabaseService { - validator_registration_cache: Arc>, - pending_validator_registrations: Arc>, - block_submissions_sender: Option>, - known_validators_cache: Arc>>, - validator_pool_cache: Arc>, region: i16, pub pool: Arc, pub high_priority_pool: Arc, + pub local_cache: Arc, } impl PostgresDatabaseService { - pub async fn from_relay_config(relay_config: &RelayConfig) -> Self { + pub async fn from_relay_config( + relay_config: &RelayConfig, + local_cache: Arc, + ) -> Self { let mut cfg = Config::new(); cfg.host = Some(relay_config.postgres.hostname.clone()); cfg.port = Some(relay_config.postgres.port); @@ -126,14 +215,10 @@ impl PostgresDatabaseService { }; PostgresDatabaseService { - validator_registration_cache: Arc::new(DashMap::new()), - pending_validator_registrations: Arc::new(DashSet::new()), - block_submissions_sender: None, - known_validators_cache: Arc::new(RwLock::new(new_validator_set())), - validator_pool_cache: Arc::new(DashMap::new()), region: relay_config.postgres.region, pool: Arc::new(pool), high_priority_pool: Arc::new(high_priority_pool), + local_cache, } } @@ -191,16 +276,36 @@ impl PostgresDatabaseService { pub async fn load_known_validators(&self) { let mut record = DbMetricRecord::new("load_known_validators"); - let client = self.pool.get().await.unwrap(); - let rows = client.query("SELECT * FROM known_validators", &[]).await.unwrap(); + let client = match self.pool.get().await { + Ok(client) => client, + Err(e) => { + error!("Error getting client from pool: {}", e); + return; + } + }; + + let rows = match client.query("SELECT * FROM known_validators", &[]).await { + Ok(rows) => rows, + Err(e) => { + error!("Error querying known_validators: {}", e); + return; + } + }; + let mut set = new_validator_set(); for row in rows { let public_key: BlsPublicKeyBytes = - parse_bytes_to_pubkey_bytes(row.get::<&str, &[u8]>("public_key")).unwrap(); + match parse_bytes_to_pubkey_bytes(row.get::<&str, &[u8]>("public_key")) { + Ok(pk) => pk, + Err(e) => { + error!("Error parsing public key: {}", e); + continue; + } + }; set.insert(public_key); } - *self.known_validators_cache.write() = set; + *self.local_cache.known_validators_cache.write() = set; record.record_success(); } @@ -213,7 +318,8 @@ impl PostgresDatabaseService { Ok(entries) => { let num_entries = entries.len(); entries.into_iter().for_each(|entry| { - self.validator_registration_cache + self.local_cache + .validator_registration_cache .insert(entry.registration_info.registration.message.pubkey, entry); }); info!("Loaded {} validator registrations", num_entries); @@ -225,9 +331,71 @@ impl PostgresDatabaseService { } } - pub async fn start_processors(&mut self) { + pub async fn start_processors( + &mut self, + db_request_receiver: crossbeam_channel::Receiver, + block_submission_receiver: crossbeam_channel::Receiver, + ) { self.start_registration_processor().await; - self.start_block_submission_processor().await; + self.start_block_submission_processor(block_submission_receiver).await; + self.start_db_request_processor(db_request_receiver).await; + self.start_check_flag_task(); + } + + fn start_check_flag_task(&mut self) { + let svc_clone = self.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(DB_CHECK_INTERVAL); + loop { + interval.tick().await; + + if svc_clone.local_cache.adjustments_failsafe_trigger.load(Ordering::Relaxed) { + svc_clone.local_cache.adjustments_enabled.store(false, Ordering::Relaxed); + return; + } + + match svc_clone.check_adjustments_enabled().await { + Ok(value) => { + let previous = svc_clone + .local_cache + .adjustments_enabled + .swap(value, Ordering::Relaxed); + if previous != value { + tracing::info!( + "adjustments enabled flag changed from {} to {}", + previous, + value + ); + } + } + Err(e) => tracing::error!("failed to check adjustments_enabled flag: {}", e), + } + } + }); + } + + pub async fn start_db_request_processor( + &mut self, + receiver: crossbeam_channel::Receiver, + ) { + let svc_clone = self.clone(); + + tokio::spawn(async move { + loop { + match receiver.recv() { + Ok(request) => { + let svc = svc_clone.clone(); + tokio::spawn(async move { + svc.handle_db_request(request).await; + }); + } + Err(e) => { + error!("DB request receiver error: {}", e); + break; + } + } + } + }); } pub async fn start_registration_processor(&self) { @@ -236,12 +404,13 @@ impl PostgresDatabaseService { tokio::spawn(async move { loop { interval.tick().await; - match self_clone.pending_validator_registrations.len() { + match self_clone.local_cache.pending_validator_registrations.len() { 0 => continue, _ => { let mut entries = Vec::new(); - for key in self_clone.pending_validator_registrations.iter() { - if let Some(entry) = self_clone.validator_registration_cache.get(&*key) + for key in self_clone.local_cache.pending_validator_registrations.iter() { + if let Some(entry) = + self_clone.local_cache.validator_registration_cache.get(&*key) { entries.push(entry.clone()); } @@ -249,7 +418,7 @@ impl PostgresDatabaseService { match self_clone._save_validator_registrations(&entries).await { Ok(_) => { for entry in entries.iter() { - self_clone.pending_validator_registrations.remove( + self_clone.local_cache.pending_validator_registrations.remove( &entry.registration_info.registration.message.pubkey, ); } @@ -265,10 +434,10 @@ impl PostgresDatabaseService { }); } - pub async fn start_block_submission_processor(&mut self) { - let (block_submissions_sender, mut block_submissions_receiver) = - tokio::sync::mpsc::channel(10_000); - self.block_submissions_sender = Some(block_submissions_sender); + pub async fn start_block_submission_processor( + &mut self, + batch_receiver: crossbeam_channel::Receiver, + ) { let svc_clone = self.clone(); tokio::spawn(async move { let mut batch = Vec::with_capacity(2_000); @@ -276,10 +445,12 @@ impl PostgresDatabaseService { let mut last_slot_processed = 0; loop { tokio::select! { - Some(item) = block_submissions_receiver.recv() => { - batch.push(item); - } _ = ticker.tick() => { + // Drain all available messages from the crossbeam channel + while let Ok(item) = batch_receiver.try_recv() { + batch.push(item); + } + if !batch.is_empty() { let mut retry_count = 0; @@ -308,6 +479,196 @@ impl PostgresDatabaseService { }); } + pub async fn load_builder_infos(&self, local_cache: Arc) { + let mut record = DbMetricRecord::new("load_builder_infos"); + + match self.get_all_builder_infos().await { + Ok(builder_infos) => { + local_cache.update_builder_infos(&builder_infos, true); + info!("Loaded {} builder infos", builder_infos.len()); + record.record_success(); + } + Err(e) => { + error!("Error loading builder infos: {}", e); + } + } + } + + pub async fn load_trusted_proposers(&self, local_cache: Arc) { + let mut record = DbMetricRecord::new("load_trusted_proposers"); + + match self.get_trusted_proposers().await { + Ok(proposers) => { + let num_proposers = proposers.len(); + local_cache.update_trusted_proposers(proposers); + info!("Loaded {} trusted proposers", num_proposers); + record.record_success(); + } + Err(e) => { + error!("Error loading trusted proposers: {}", e); + } + } + } + + async fn handle_db_request(&self, request: DbRequest) { + match request { + DbRequest::GetValidatorRegistration { pub_key, response } => { + let result = self.get_validator_registration(&pub_key).await; + let _ = response.send(result); + } + DbRequest::GetValidatorRegistrationsForPubKeys { pub_keys, response } => { + let pub_key_refs: Vec<&BlsPublicKeyBytes> = pub_keys.iter().collect(); + let result = self.get_validator_registrations_for_pub_keys(&pub_key_refs).await; + let _ = response.send(result); + } + DbRequest::SaveTooLateGetPayload { + slot, + proposer_pub_key, + payload_hash, + message_received, + payload_fetched, + } => { + if let Err(err) = self + .save_too_late_get_payload( + slot, + &proposer_pub_key, + &payload_hash, + message_received, + payload_fetched, + ) + .await + { + error!(%err, "failed to save too late get payload"); + } + } + DbRequest::SaveDeliveredPayload { params } => { + if let Err(err) = self.save_delivered_payload(¶ms).await { + error!(%err, "error saving payload to database"); + } + } + DbRequest::StoreBuildersInfo { builders } => { + if let Err(err) = self.store_builders_info(&builders).await { + error!(%err, "failed to store builders info"); + } + } + DbRequest::DbDemoteBuilder { + slot, + builder_pub_key, + block_hash, + reason, + failsafe_triggered, + } => { + if let Err(err) = + self.db_demote_builder(slot, &builder_pub_key, &block_hash, reason).await + { + error!(%err, "error demoting builder in database triggering failsafe: stopping all optimistic submissions"); + failsafe_triggered.store(true, Ordering::Relaxed); + alert_discord(&format!( + "{} {} {} failed to demote builder in database! Pausing all optmistic submissions", + builder_pub_key, err, block_hash + )); + } + } + DbRequest::GetBids { filters, validator_preferences, response } => { + let result = self.get_bids(&filters, validator_preferences).await; + let _ = response.send(result); + } + DbRequest::GetDeliveredPayloads { filters, validator_preferences, response } => { + let result = self.get_delivered_payloads(&filters, validator_preferences).await; + let _ = response.send(result); + } + DbRequest::SaveGetHeaderCall { + params, + best_block_hash, + value, + trace, + mev_boost, + user_agent, + } => { + if let Err(err) = self + .save_get_header_call( + params, + best_block_hash, + value, + trace, + mev_boost, + user_agent, + ) + .await + { + error!(%err, "error saving get header call to database"); + } + } + DbRequest::SaveFailedGetPayload { slot, block_hash, error, trace } => { + if let Err(err) = self.save_failed_get_payload(slot, block_hash, error, trace).await + { + error!(err = ?err, "error saving failed get payload"); + } + } + DbRequest::SaveGossipedPayloadTrace { block_hash, trace } => { + if let Err(err) = self.save_gossiped_payload_trace(block_hash, trace).await { + error!(%err, "failed to store gossiped payload trace") + } + } + DbRequest::GetTrustedProposers { response } => { + let result = self.get_trusted_proposers().await; + let _ = response.send(result); + } + DbRequest::SaveInclusionList { + inclusion_list, + slot, + block_parent_hash, + proposer_pubkey, + } => { + if let Err(err) = self + .save_inclusion_list( + &inclusion_list, + slot, + &block_parent_hash, + &proposer_pubkey, + ) + .await + { + error!(%slot, "failed to save inclusion list Errors: {:?}", err); + } + } + DbRequest::GetBlockAdjustmentsForSlot { slot, response } => { + let result = self.get_block_adjustments_for_slot(slot).await; + let _ = response.send(result); + } + DbRequest::SaveBlockAdjustmentsData { entry } => { + if let Err(err) = self.save_block_adjustments_data(entry).await { + error!(%err, "failed to save block adjustments data"); + } + } + DbRequest::GetProposerHeaderDelivered { params, response } => { + let result = self.get_proposer_header_delivered(¶ms).await; + let _ = response.send(result); + } + DbRequest::SetKnownValidators { known_validators } => { + if let Err(err) = self.set_known_validators(known_validators).await { + error!(%err, "failed to set known validators"); + } + } + DbRequest::SetProposerDuties { duties } => { + if let Err(err) = self.set_proposer_duties(duties).await { + error!(%err, "failed to set proposer duties"); + } + } + DbRequest::DisableAdjustments { block_hash, failsafe_trigger, adjustments_enabled } => { + if let Err(err) = self.disable_adjustments().await { + failsafe_trigger.store(true, Ordering::Relaxed); + adjustments_enabled.store(false, Ordering::Relaxed); + error!(%block_hash, %err, "failed to disable adjustments in database, pulling the failsafe trigger"); + alert_discord(&format!( + "{} {} failed to disable adjustments in database, pulling the failsafe trigger", + err, block_hash + )); + } + } + } + } + async fn _save_validator_registrations( &self, entries: &[SignedValidatorRegistrationEntry], @@ -536,7 +897,7 @@ impl PostgresDatabaseService { proposer_fee_recipient: item.submission.proposer_fee_recipient().as_slice(), gas_limit: item.submission.gas_limit() as i32, gas_used: item.submission.gas_used() as i32, - value: PostgresNumeric::from(item.submission.value()), + value: PostgresNumeric::from(*item.submission.value()), num_txs: item.submission.num_txs() as i32, timestamp: item.submission.timestamp() as i64, first_seen: item.trace.receive as i64, @@ -641,59 +1002,15 @@ impl Default for PostgresDatabaseService { let high_priority_pool = cfg.create_pool(None, NoTls).unwrap(); PostgresDatabaseService { - validator_registration_cache: Arc::new(DashMap::new()), - pending_validator_registrations: Arc::new(DashSet::new()), - block_submissions_sender: None, - known_validators_cache: Arc::new(RwLock::new(new_validator_set())), - validator_pool_cache: Arc::new(DashMap::new()), region: 1, pool: Arc::new(pool), high_priority_pool: Arc::new(high_priority_pool), + local_cache: Arc::new(LocalCache::new()), } } } impl PostgresDatabaseService { - /// Assume the entries are already validated - pub fn save_validator_registrations( - &self, - entries: impl Iterator, - pool_name: Option, - user_agent: Option, - ) { - for entry in entries { - self.pending_validator_registrations.insert(entry.registration.message.pubkey); - self.validator_registration_cache.insert( - entry.registration.message.pubkey, - SignedValidatorRegistrationEntry::new( - entry.clone(), - pool_name.clone(), - user_agent.clone(), - ), - ); - } - } - - pub fn is_registration_update_required( - &self, - registration: &SignedValidatorRegistration, - ) -> bool { - if let Some(existing_entry) = - self.validator_registration_cache.get(®istration.message.pubkey) && - existing_entry.registration_info.registration.message.timestamp >= - registration.message.timestamp.saturating_sub(60 * 60) && - existing_entry.registration_info.registration.message.fee_recipient == - registration.message.fee_recipient && - existing_entry.registration_info.registration.message.gas_limit == - registration.message.gas_limit - { - // do registration once per hour, unless fee recipient / gas limit has changed - - return false; - } - true - } - #[instrument(skip_all)] pub async fn get_validator_registration( &self, @@ -737,7 +1054,7 @@ impl PostgresDatabaseService { } #[instrument(skip_all)] - pub async fn get_validator_registrations( + async fn get_validator_registrations( &self, ) -> Result, DatabaseError> { let mut record = DbMetricRecord::new("get_validator_registrations"); @@ -868,177 +1185,6 @@ impl PostgresDatabaseService { Ok(()) } - #[instrument(skip_all)] - pub async fn get_proposer_duties( - &self, - ) -> Result, DatabaseError> { - let mut record = DbMetricRecord::new("get_proposer_duties"); - - let rows = self - .high_priority_pool - .get() - .await? - .query( - " - SELECT * FROM proposer_duties - INNER JOIN validator_registrations - ON proposer_duties.public_key = validator_registrations.public_key - INNER JOIN validator_preferences - ON proposer_duties.public_key = validator_preferences.public_key - WHERE validator_registrations.active = true - ", - &[], - ) - .await?; - - record.record_success(); - parse_rows(rows) - } - - #[instrument(skip_all)] - pub async fn get_validator_preferences( - &self, - pub_key: &BlsPublicKeyBytes, - ) -> Result, DatabaseError> { - let mut record = DbMetricRecord::new("get_validator_preferences"); - - if let Some(cached_entry) = self.validator_registration_cache.get(pub_key) { - record.record_success(); - return Ok(Some(cached_entry.registration_info.preferences.clone())); - } - - let rows = self - .pool - .get() - .await? - .query( - " - SELECT - validator_preferences.filtering, - validator_preferences.trusted_builders, - validator_preferences.header_delay, - validator_preferences.delay_ms, - validator_preferences.disable_inclusion_lists - FROM validator_preferences - WHERE validator_preferences.public_key = $1 - ", - &[&(pub_key.as_slice())], - ) - .await?; - - let result = if rows.is_empty() { - None - } else { - let prefs: ValidatorPreferences = parse_row(&rows[0])?; - Some(prefs) - }; - - record.record_success(); - Ok(result) - } - - pub async fn update_validator_preferences( - &self, - pub_key: &BlsPublicKeyBytes, - preferences: &ValidatorPreferences, - ) -> Result<(), DatabaseError> { - let mut record = DbMetricRecord::new("update_validator_preferences"); - - let mut client = self.pool.get().await?; - let transaction = client.transaction().await?; - - let trusted_builders: Option> = - preferences.trusted_builders.as_ref().map(|builders| builders.to_vec()); - - let filtering_value: i16 = match preferences.filtering { - helix_common::Filtering::Regional => 1, - helix_common::Filtering::Global => 0, - }; - - transaction - .execute( - " - INSERT INTO validator_preferences ( - public_key, - filtering, - trusted_builders, - header_delay, - delay_ms, - disable_inclusion_lists, - manual_override - ) VALUES ($1, $2, $3, $4, $5, $6, TRUE) - ON CONFLICT (public_key) - DO UPDATE SET - filtering = EXCLUDED.filtering, - trusted_builders = EXCLUDED.trusted_builders, - header_delay = EXCLUDED.header_delay, - delay_ms = EXCLUDED.delay_ms, - disable_inclusion_lists = EXCLUDED.disable_inclusion_lists, - manual_override = TRUE - ", - &[ - &pub_key.as_slice(), - &filtering_value, - &trusted_builders, - &preferences.header_delay, - &preferences.delay_ms.map(|v| v as i64), - &preferences.disable_inclusion_lists, - ], - ) - .await?; - - transaction.commit().await?; - - if let Some(mut cached_entry) = self.validator_registration_cache.get_mut(pub_key) { - cached_entry.registration_info.preferences = preferences.clone(); - } - - record.record_success(); - - Ok(()) - } - - #[instrument(skip_all)] - pub async fn get_pool_validators( - &self, - pool_name: &str, - ) -> Result, DatabaseError> { - let mut record = DbMetricRecord::new("get_pool_validators"); - - let rows = self - .pool - .get() - .await? - .query( - " - SELECT public_key - FROM trusted_proposers - WHERE name = $1 - ", - &[&pool_name], - ) - .await?; - - let mut validators = Vec::new(); - for row in rows { - let pubkey_bytes: Vec = row.try_get("public_key")?; - - if pubkey_bytes.len() == 48 { - let mut key = [0u8; 48]; - key.copy_from_slice(&pubkey_bytes); - validators.push(BlsPublicKeyBytes::from(key)); - } else { - error!( - length = pubkey_bytes.len(), - "Invalid validator pubkey length in trusted_proposers" - ); - } - } - - record.record_success(); - Ok(validators) - } - pub async fn set_known_validators( &self, known_validators: Vec, @@ -1052,7 +1198,7 @@ impl PostgresDatabaseService { let keys_to_remove: Vec; { - let mut curr_known_guard = self.known_validators_cache.write(); + let mut curr_known_guard = self.local_cache.known_validators_cache.write(); info!("Known validators: current cache size: {:?}", curr_known_guard.len()); keys_to_add = new_keys_set.difference(&curr_known_guard).cloned().collect(); @@ -1107,54 +1253,32 @@ impl PostgresDatabaseService { Ok(()) } - pub fn known_validators_cache(&self) -> &Arc>> { - &self.known_validators_cache - } - #[instrument(skip_all)] - pub async fn get_validator_pool_name( - &self, - api_key: &str, - ) -> Result, DatabaseError> { - let mut record = DbMetricRecord::new("get_validator_pool_name"); - - let client = self.high_priority_pool.get().await?; - - if self.validator_pool_cache.is_empty() { - let rows = client.query("SELECT * FROM validator_pools", &[]).await?; - for row in rows { - let api_key: String = row.get::<&str, &str>("api_key").to_string(); - let name: String = row.get::<&str, &str>("name").to_string(); - self.validator_pool_cache.insert(api_key, name); - } - } + pub async fn load_validator_pools(&self) { + let mut record = DbMetricRecord::new("load_validator_pools"); - if self.validator_pool_cache.contains_key(api_key) { - return Ok(self.validator_pool_cache.get(api_key).map(|f| f.clone())); - } - - let api_key = api_key.to_string(); - let rows = match client - .query("SELECT * FROM validator_pools WHERE api_key = $1", &[&api_key]) - .await - { + let client = match self.high_priority_pool.get().await { + Ok(client) => client, + Err(e) => { + error!("Error getting client from pool: {}", e); + return; + } + }; + let rows = match client.query("SELECT * FROM validator_pools", &[]).await { Ok(rows) => rows, Err(e) => { error!("Error querying validator_pools: {}", e); - return Err(DatabaseError::from(e)); + return; } }; - if rows.is_empty() { - return Ok(None); + for row in rows { + let api_key: String = row.get::<&str, &str>("api_key").to_string(); + let name: String = row.get::<&str, &str>("name").to_string(); + self.local_cache.validator_pool_cache.insert(api_key, name); } - let name: String = rows[0].get("name"); - - self.validator_pool_cache.insert(api_key.to_string(), name.clone()); - record.record_success(); - Ok(Some(name)) } #[instrument(skip_all)] @@ -1364,62 +1488,6 @@ impl PostgresDatabaseService { Ok(()) } - #[instrument(skip_all)] - pub async fn store_block_submission( - &self, - submission: SignedBidSubmission, - trace: SubmissionTrace, - optimistic_version: OptimisticVersion, - ) -> Result<(), DatabaseError> { - let mut record = DbMetricRecord::new("store_block_submission"); - if let Some(sender) = &self.block_submissions_sender { - sender - .send(PendingBlockSubmissionValue { submission, trace, optimistic_version }) - .await - .map_err(|_| DatabaseError::ChannelSendError)?; - } else { - return Err(DatabaseError::ChannelSendError); - } - record.record_success(); - Ok(()) - } - - #[instrument(skip_all)] - pub async fn store_builder_info( - &self, - builder_pub_key: &BlsPublicKeyBytes, - builder_info: &BuilderInfo, - ) -> Result<(), DatabaseError> { - let mut record = DbMetricRecord::new("store_builder_info"); - - info!("Storing builder info for {:?}", builder_pub_key); - - self.pool - .get() - .await? - .execute( - " - INSERT INTO builder_info (public_key, collateral, is_optimistic, is_optimistic_for_regional_filtering, builder_id, builder_ids) - VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (public_key) - DO UPDATE SET - builder_ids = array_concat_uniq(COALESCE(builder_info.builder_ids, '{}'::character varying[]), EXCLUDED.builder_ids) - ", - &[ - &(builder_pub_key.as_slice()), - &(PostgresNumeric::from(builder_info.collateral)), - &(builder_info.is_optimistic), - &(builder_info.is_optimistic_for_regional_filtering), - &(builder_info.builder_id), - &(builder_info.builder_ids), - ], - ) - .await?; - - record.record_success(); - Ok(()) - } - #[instrument(skip_all)] pub async fn store_builders_info( &self, @@ -1509,7 +1577,7 @@ impl PostgresDatabaseService { } #[instrument(skip_all)] - pub async fn get_all_builder_infos(&self) -> Result, DatabaseError> { + async fn get_all_builder_infos(&self) -> Result, DatabaseError> { let mut record = DbMetricRecord::new("get_all_builder_infos"); let rows = @@ -1519,18 +1587,6 @@ impl PostgresDatabaseService { parse_rows(rows) } - #[instrument(skip_all)] - pub async fn check_builder_api_key(&self, api_key: &str) -> Result { - let mut record = DbMetricRecord::new("check_builder_api_key"); - - let client = self.high_priority_pool.get().await?; - let rows = - client.query("SELECT * FROM builder_info WHERE api_key = $1", &[&(api_key)]).await?; - - record.record_success(); - Ok(!rows.is_empty()) - } - #[instrument(skip_all)] pub async fn db_demote_builder( &self, @@ -2205,7 +2261,7 @@ impl PostgresDatabaseService { adjusted_block_hash, adjusted_value FROM bid_adjustments - WHERE slot = $1 + WHERE slot = $1 AND COALESCE(is_dry_run, FALSE) = FALSE ", &[&(slot.as_u64() as i64)], ) @@ -2220,7 +2276,7 @@ impl PostgresDatabaseService { &self, entry: DataAdjustmentsEntry, ) -> Result<(), DatabaseError> { - let mut record = DbMetricRecord::new("save_bloc_adjustments_data"); + let mut record = DbMetricRecord::new("save_block_adjustments_data"); self.pool .get() @@ -2237,10 +2293,11 @@ impl PostgresDatabaseService { submitted_received_at, submitted_value, adjusted_block_hash, - adjusted_value + adjusted_value, + is_dry_run ) VALUES - ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ", &[ &(entry.slot.as_u64() as i64), @@ -2252,6 +2309,7 @@ impl PostgresDatabaseService { &PostgresNumeric::from(entry.submitted_value), &entry.adjusted_block_hash.as_slice(), &PostgresNumeric::from(entry.adjusted_value), + &entry.is_dry_run, ], ) .await?; @@ -2341,4 +2399,37 @@ impl PostgresDatabaseService { record.record_success(); Ok(results) } + + #[instrument(skip_all)] + pub async fn disable_adjustments(&self) -> Result<(), DatabaseError> { + let mut record = DbMetricRecord::new("disable_adjustments"); + + let mut client = self.high_priority_pool.get().await?; + let transaction = client.transaction().await?; + + transaction.execute("UPDATE relay_info SET adjustments_enabled = FALSE", &[]).await?; + + record.record_success(); + Ok(()) + } + + pub async fn check_adjustments_enabled(&self) -> Result { + let mut record = DbMetricRecord::new("check_adjustments_enabled"); + + let rows = self + .pool + .get() + .await? + .query("SELECT adjustments_enabled FROM relay_info LIMIT 1", &[]) + .await?; + + let enabled = rows + .iter() + .map(|row| row.get::<_, bool>("adjustments_enabled")) + .next() + .unwrap_or(false); + + record.record_success(); + Ok(enabled) + } } diff --git a/crates/relay/src/database/postgres/postgres_db_service_tests.rs b/crates/relay/src/database/postgres/postgres_db_service_tests.rs index 5f753d6f..2fb1cb14 100644 --- a/crates/relay/src/database/postgres/postgres_db_service_tests.rs +++ b/crates/relay/src/database/postgres/postgres_db_service_tests.rs @@ -209,56 +209,6 @@ mod tests { assert!(result.is_ok()); } - #[tokio::test] - async fn test_save_and_get_builder_info() { - run_setup().await; - - let db_service = PostgresDatabaseService::new(&test_config(), 0).unwrap(); - - let public_key = BlsPublicKey::deserialize(&alloy_primitives::hex!("8C266FD5CB50B5D9431DAA69C4BE17BC9A79A85D172112DA09E0AC3E2D0DCF785021D49B6DF57827D6BC61EBA086A507")).unwrap().serialize().into(); - let builder_info = helix_common::BuilderInfo { - collateral: U256::from(10000000000000000000u64), - is_optimistic: false, - is_optimistic_for_regional_filtering: false, - builder_id: None, - builder_ids: Some(vec!["test3".to_string()]), - api_key: None, - }; - - let result = db_service.store_builder_info(&public_key, &builder_info).await; - assert!(result.is_ok()); - - let result = db_service.get_all_builder_infos().await.unwrap(); - assert!(result.len() == 1); - assert!(result[0].pub_key == public_key); - } - - #[tokio::test] - async fn test_demotion() { - run_setup().await; - - let db_service = PostgresDatabaseService::new(&test_config(), 0).unwrap(); - - let key = BlsSecretKey::random(); - let public_key = key.public_key().serialize().into(); - - let builder_info = helix_common::BuilderInfo { - collateral: Default::default(), - is_optimistic: false, - is_optimistic_for_regional_filtering: false, - builder_id: None, - builder_ids: None, - api_key: None, - }; - - let result = db_service.store_builder_info(&public_key, &builder_info).await; - assert!(result.is_ok()); - - let result = - db_service.db_demote_builder(0, &public_key, &Default::default(), "".to_string()).await; - assert!(result.is_ok()); - } - #[tokio::test] async fn test_store_block_submission() -> Result<(), Box> { run_setup().await; diff --git a/crates/relay/src/database/types/params.rs b/crates/relay/src/database/types/params.rs index 58e507d7..624cc85d 100644 --- a/crates/relay/src/database/types/params.rs +++ b/crates/relay/src/database/types/params.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use alloy_primitives::{Address, U256}; use helix_common::{Filtering, GetPayloadTrace}; use helix_types::{BlsPublicKeyBytes, PayloadAndBlobs}; @@ -10,7 +8,7 @@ pub struct SavePayloadParams { pub proposer_pub_key: BlsPublicKeyBytes, pub value: U256, pub proposer_fee_recipient: Address, - pub payload: Arc, + pub payload: PayloadAndBlobs, pub latency_trace: GetPayloadTrace, pub user_agent: Option, pub filtering: Filtering, diff --git a/crates/relay/src/gossip/types.rs b/crates/relay/src/gossip/types.rs index 6a409df9..ae838d1d 100644 --- a/crates/relay/src/gossip/types.rs +++ b/crates/relay/src/gossip/types.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use helix_types::{ BlsPublicKeyBytes, ForkName, ForkVersionDecode, PayloadAndBlobs, SignedBlindedBeaconBlock, }; @@ -58,7 +56,7 @@ fn decode_ssz_signed_blinded_beacon_block( #[derive(Clone, Debug)] pub struct BroadcastPayloadParams { - pub execution_payload: Arc, + pub execution_payload: PayloadAndBlobs, pub slot: u64, pub proposer_pub_key: BlsPublicKeyBytes, pub bid_data: PayloadBidData, @@ -106,10 +104,10 @@ impl BroadcastPayloadParams { fn decode_ssz_payload_and_blobs( bytes: &[u8], fork_name: Option, -) -> Result, ssz::DecodeError> { +) -> Result { if let Some(fork_name) = fork_name { let payload = PayloadAndBlobs::from_ssz_bytes_by_fork(bytes, fork_name)?; - return Ok(Arc::new(payload)); + return Ok(payload); } Err(ssz::DecodeError::NoMatchingVariant) diff --git a/crates/relay/src/housekeeper/error.rs b/crates/relay/src/housekeeper/error.rs index ae11793c..49348b9a 100644 --- a/crates/relay/src/housekeeper/error.rs +++ b/crates/relay/src/housekeeper/error.rs @@ -13,4 +13,7 @@ pub enum HousekeeperError { #[error("auctioneer error. {0}")] Auctioneer(#[from] AuctioneerError), + + #[error("oneshot channel recv error. {0}")] + Channel(#[from] tokio::sync::oneshot::error::RecvError), } diff --git a/crates/relay/src/housekeeper/housekeeper.rs b/crates/relay/src/housekeeper/housekeeper.rs index 199c9d29..27fffdd2 100644 --- a/crates/relay/src/housekeeper/housekeeper.rs +++ b/crates/relay/src/housekeeper/housekeeper.rs @@ -19,13 +19,13 @@ use tokio::sync::{Mutex, broadcast}; use tracing::{Instrument, debug, error, info, warn}; use crate::{ + DbHandle, auctioneer::Event, beacon::{ error::BeaconClientError, multi_beacon_client::MultiBeaconClient, types::{HeadEventData, StateId}, }, - database::postgres::postgres_db_service::PostgresDatabaseService, housekeeper::{ EthereumPrimevService, error::HousekeeperError, inclusion_list::InclusionListService, }, @@ -34,8 +34,6 @@ use crate::{ const PROPOSER_DUTIES_UPDATE_FREQ: u64 = 1; -const TRUSTED_PROPOSERS_UPDATE_FREQ: u64 = 5; - const CUTOFF_TIME: Duration = Duration::from_secs(4); // Constants for known validators refresh logic. @@ -47,12 +45,9 @@ struct HousekeeperSlots { head: Arc, proposer_duties: Arc, refreshed_validators: Arc, - trusted_proposers: Arc, // updating can take >> slot time, so we avoid concurrent updates by locking the mutex - updating_builder_infos: Arc>, updating_proposer_duties: Arc>, updating_refreshed_validators: Arc>, - updating_trusted_proposers: Arc>, } impl HousekeeperSlots { @@ -74,12 +69,6 @@ impl HousekeeperSlots { fn update_refreshed_validators(&self, head_slot: Slot) { self.refreshed_validators.store(head_slot.as_u64(), Ordering::Relaxed); } - fn trusted_proposers(&self) -> u64 { - self.trusted_proposers.load(Ordering::Relaxed) - } - fn update_trusted_proposers(&self, head_slot: Slot) { - self.trusted_proposers.store(head_slot.as_u64(), Ordering::Relaxed); - } } /// Housekeeper Service. @@ -90,19 +79,18 @@ impl HousekeeperSlots { /// will sync through db. #[derive(Clone)] pub struct Housekeeper { - db: Arc, + db: DbHandle, beacon_client: Arc, auctioneer: Arc, chain_info: Arc, primev_service: Option, slots: HousekeeperSlots, inclusion_list_service: Option, - local_builders: Vec, } impl Housekeeper { pub fn new( - db: Arc, + db: DbHandle, beacon_client: Arc, auctioneer: Arc, config: &RelayConfig, @@ -132,7 +120,6 @@ impl Housekeeper { primev_service, slots: HousekeeperSlots::default(), inclusion_list_service, - local_builders: config.builders.clone(), } } @@ -237,52 +224,6 @@ impl Housekeeper { ); } } - - // builder info updated every slot - let housekeeper = self.clone(); - task::spawn( - file!(), - line!(), - async move { - let Ok(_guard) = housekeeper.slots.updating_builder_infos.try_lock() else { - warn!("builder info update already in progress"); - return; - }; - let start = Instant::now(); - if let Err(err) = housekeeper.sync_builder_info_changes().await { - error!(%err, "failed to sync builder info changes"); - } - info!(duration = ?start.elapsed(), "sync builder info task completed"); - } - .in_current_span(), - ); - - // trusted proposers - if self.should_update_trusted_proposers(head_slot.as_u64()) { - if is_local_dev() { - warn!("skipping refresh of trusted proposers") - } else { - let housekeeper = self.clone(); - task::spawn( - file!(), - line!(), - async move { - let Ok(_guard) = housekeeper.slots.updating_trusted_proposers.try_lock() else { - warn!("trusted proposer update already in progress"); - return; - }; - let start = Instant::now(); - if let Err(err) = housekeeper.update_trusted_proposers().await { - error!(%err, "failed to update trusted proposers"); - } else { - housekeeper.slots.update_trusted_proposers(head_slot); - } - info!(duration = ?start.elapsed(), "update trusted proposers task completed"); - } - .in_current_span(), - ); - } - } } #[tracing::instrument(skip_all, name = "update_proposer_duties", fields(epoch = %epoch))] @@ -360,21 +301,7 @@ impl Housekeeper { let validators = self.beacon_client.get_state_validators(StateId::Head).await?; debug!(validators = validators.len(), duration = ?start.elapsed(), "fetched validators"); - self.db.set_known_validators(validators).await?; - - Ok(()) - } - - /// Synchronizes builder information changes. - async fn sync_builder_info_changes(&self) -> Result<(), HousekeeperError> { - let mut builder_infos = self.db.get_all_builder_infos().await?; - debug!(builder_infos = builder_infos.len(), "updating builder infos"); - - if is_local_dev() { - builder_infos.extend_from_slice(self.local_builders.as_slice()); - } - - self.auctioneer.update_builder_infos(&builder_infos, true); + self.db.set_known_validators(validators); Ok(()) } @@ -434,7 +361,7 @@ impl Housekeeper { if is_local_dev() { warn!("skipping proposer duty update in db"); } else { - self.db.set_proposer_duties(formatted_proposer_duties).await?; + self.db.set_proposer_duties(formatted_proposer_duties); } Ok(()) @@ -449,7 +376,7 @@ impl Housekeeper { async fn primev_update_with_duties( primev_service: EthereumPrimevService, auctioneer: Arc, - db: Arc, + db: DbHandle, proposer_duties: Vec, ) -> Result<(), HousekeeperError> { let primev_validators = @@ -517,26 +444,7 @@ impl Housekeeper { auctioneer.update_builder_infos(&primev_builders_config, false); - db.store_builders_info(primev_builders_config.as_slice()).await?; - - Ok(()) - } - - fn should_update_trusted_proposers(&self, head_slot: u64) -> bool { - let last_updated = self.slots.trusted_proposers(); - head_slot.is_multiple_of(TRUSTED_PROPOSERS_UPDATE_FREQ) || - head_slot.saturating_sub(last_updated) >= TRUSTED_PROPOSERS_UPDATE_FREQ - } - - /// Update the proposer whitelist. - /// - /// This function will fetch the proposer whitelist from the database and update the auctioneer. - /// It will also update the `refreshed_trusted_proposers_slot` to the current `head_slot`. - async fn update_trusted_proposers(&self) -> Result<(), HousekeeperError> { - let start = Instant::now(); - let proposer_whitelist = self.db.get_trusted_proposers().await?; - debug!(proposer_whitelist = proposer_whitelist.len(), duration = ?start.elapsed(), "fetched trusted proposers"); - self.auctioneer.update_trusted_proposers(proposer_whitelist); + db.store_builders_info(primev_builders_config); Ok(()) } @@ -555,12 +463,11 @@ impl Housekeeper { /// Fetch validator registrations for `pub_keys` from database. async fn fetch_signed_validator_registrations( &self, - pubkeys: &[&BlsPublicKeyBytes], + pubkeys: &[BlsPublicKeyBytes], ) -> Result, HousekeeperError> { let registrations: Vec = - self.db.get_validator_registrations_for_pub_keys(pubkeys).await?; - + self.auctioneer.get_validator_registrations_for_pub_keys(pubkeys); let registrations = registrations.into_iter().map(|entry| (*entry.public_key(), entry)).collect(); @@ -576,8 +483,8 @@ impl Housekeeper { HousekeeperError, > { let proposer_duties = self.fetch_duties(epoch.as_u64()).await?; - let pubkeys: Vec<&BlsPublicKeyBytes> = - proposer_duties.iter().map(|duty| &duty.pubkey).collect(); + let pubkeys: Vec = + proposer_duties.iter().map(|duty| duty.pubkey).collect(); let signed_validator_registrations = self.fetch_signed_validator_registrations(&pubkeys).await?; diff --git a/crates/relay/src/housekeeper/inclusion_list/service.rs b/crates/relay/src/housekeeper/inclusion_list/service.rs index 656d4477..311524df 100644 --- a/crates/relay/src/housekeeper/inclusion_list/service.rs +++ b/crates/relay/src/housekeeper/inclusion_list/service.rs @@ -8,10 +8,10 @@ use helix_common::{ local_cache::LocalCache, }; use helix_types::{BlsPublicKeyBytes, Slot}; -use tracing::{error, info, warn}; +use tracing::{info, warn}; use crate::{ - auctioneer::Event, database::postgres::postgres_db_service::PostgresDatabaseService, + DbHandle, auctioneer::Event, housekeeper::inclusion_list::http_fetcher::HttpInclusionListFetcher, network::RelayNetworkManager, }; @@ -20,7 +20,7 @@ const MISSING_INCLUSION_LIST_CUTOFF: Duration = Duration::from_secs(6); #[derive(Clone)] pub struct InclusionListService { - db: Arc, + db: DbHandle, local_cache: Arc, http_il_fetcher: HttpInclusionListFetcher, chain_info: Arc, @@ -30,7 +30,7 @@ pub struct InclusionListService { impl InclusionListService { pub fn new( - db: Arc, + db: DbHandle, local_cache: Arc, config: InclusionListConfig, chain_info: Arc, @@ -83,15 +83,7 @@ impl InclusionListService { il: Some(inclusion_list.clone()), }); - match self.db.save_inclusion_list(&inclusion_list, head_slot, &parent_hash, &pub_key).await - { - Ok(_) => { - info!(head_slot = head_slot, "Saved inclusion list to postgres"); - } - Err(err) => { - error!(head_slot, "Could not save inclusion list to postgres. Error: {:?}", err); - } - } + self.db.save_inclusion_list(inclusion_list, head_slot, parent_hash, pub_key); } async fn fetch_inclusion_list_or_timeout(&self, slot: u64) -> Option { diff --git a/crates/relay/src/housekeeper/mod.rs b/crates/relay/src/housekeeper/mod.rs index 45988784..248821ff 100644 --- a/crates/relay/src/housekeeper/mod.rs +++ b/crates/relay/src/housekeeper/mod.rs @@ -19,8 +19,8 @@ pub use slot_info::CurrentSlotInfo; use tokio::sync::broadcast; use crate::{ - auctioneer::Event, beacon::multi_beacon_client::MultiBeaconClient, - database::postgres::postgres_db_service::PostgresDatabaseService, network::RelayNetworkManager, + DbHandle, auctioneer::Event, beacon::multi_beacon_client::MultiBeaconClient, + network::RelayNetworkManager, }; const HEAD_EVENT_CHANNEL_SIZE: usize = 100; @@ -28,7 +28,7 @@ const PAYLOAD_ATTRIBUTE_CHANNEL_SIZE: usize = 300; /// Start housekeeper and chain updater pub async fn start_housekeeper( - db: Arc, + db: DbHandle, auctioneer: Arc, config: &RelayConfig, beacon_client: Arc, diff --git a/crates/relay/src/lib.rs b/crates/relay/src/lib.rs index f08b2c17..4e988581 100644 --- a/crates/relay/src/lib.rs +++ b/crates/relay/src/lib.rs @@ -9,9 +9,15 @@ mod website; pub use crate::{ api::{Api, BidAdjustor, DefaultBidAdjustor, start_admin_service, start_api_service}, - auctioneer::{PayloadEntry, SimulatorClient}, + auctioneer::{ + AuctioneerHandle, Event, PayloadEntry, RegWorkerHandle, SimulatorClient, SimulatorRequest, + SlotData, SubmissionPayload, spawn_workers, + }, beacon::start_beacon_client, - database::{postgres::postgres_db_service::PostgresDatabaseService, start_db_service}, + database::{ + DbRequest, PendingBlockSubmissionValue, handle::DbHandle, + postgres::postgres_db_service::PostgresDatabaseService, start_db_service, + }, housekeeper::start_housekeeper, network::RelayNetworkManager, website::WebsiteService, diff --git a/crates/relay/src/main.rs b/crates/relay/src/main.rs index 2c90303e..fd32edf6 100644 --- a/crates/relay/src/main.rs +++ b/crates/relay/src/main.rs @@ -9,6 +9,7 @@ use std::{ use eyre::eyre; use helix_common::{ RelayConfig, + api::builder_api::TopBidUpdate, api_provider::DefaultApiProvider, load_config, load_keypair, local_cache::LocalCache, @@ -18,8 +19,9 @@ use helix_common::{ utils::{init_panic_hook, init_tracing_log}, }; use helix_relay::{ - Api, DefaultBidAdjustor, PostgresDatabaseService, RelayNetworkManager, WebsiteService, - start_admin_service, start_api_service, start_beacon_client, start_db_service, + Api, AuctioneerHandle, DbHandle, DbRequest, DefaultBidAdjustor, Event, + PendingBlockSubmissionValue, RegWorkerHandle, RelayNetworkManager, WebsiteService, + spawn_workers, start_admin_service, start_api_service, start_beacon_client, start_db_service, start_housekeeper, }; use helix_types::BlsKeypair; @@ -60,8 +62,39 @@ fn main() { config.logging.dir_path(), ); + let (db_request_sender, db_request_receiver) = crossbeam_channel::bounded(10_000); + let (db_batch_request_sender, db_batch_request_receiver) = crossbeam_channel::bounded(10_000); + let (top_bid_tx, _) = tokio::sync::broadcast::channel(100); + let event_channel = crossbeam_channel::bounded(10_000); + let event_tx = event_channel.0.clone(); + + let db = DbHandle::new(db_request_sender.clone(), db_batch_request_sender.clone()); + let local_cache = LocalCache::new(); + + // spawn auctioneer + let (auctioneer_handle, registrations_handle) = spawn_workers( + config.clone(), + db.clone(), + local_cache.clone(), + DefaultBidAdjustor {}, + top_bid_tx.clone(), + event_channel, + ); + block_on(start_metrics_server(&config)); - match block_on(run(instance_id, config, keypair)) { + match block_on(start_network_services( + instance_id, + config, + keypair, + db, + local_cache, + auctioneer_handle, + registrations_handle, + db_request_receiver, + db_batch_request_receiver, + top_bid_tx, + event_tx, + )) { Ok(_) => info!("relay exited"), Err(err) => { error!(%err, "relay exited with error"); @@ -70,10 +103,25 @@ fn main() { } } -async fn run(instance_id: String, config: RelayConfig, keypair: BlsKeypair) -> eyre::Result<()> { +#[allow(clippy::too_many_arguments)] +async fn start_network_services( + instance_id: String, + config: RelayConfig, + keypair: BlsKeypair, + db: DbHandle, + mut local_cache: LocalCache, + auctioneer_handle: AuctioneerHandle, + registrations_handle: RegWorkerHandle, + db_request_receiver: crossbeam_channel::Receiver, + db_batch_request_receiver: crossbeam_channel::Receiver, + top_bid_tx: tokio::sync::broadcast::Sender, + event_channel: crossbeam_channel::Sender, +) -> eyre::Result<()> { let beacon_client = start_beacon_client(&config); let chain_info = beacon_client.load_chain_info().await; let chain_info = Arc::new(chain_info); + local_cache.set_chain_info(chain_info.clone()); + let local_cache = Arc::new(local_cache); info!( instance_id, @@ -87,15 +135,18 @@ async fn run(instance_id: String, config: RelayConfig, keypair: BlsKeypair) -> e let known_validators_loaded = Arc::new(AtomicBool::default()); - let db = start_db_service(&config, known_validators_loaded.clone()).await?; - let local_cache = start_auctioneer(db.clone()).await?; + let pg_db = start_db_service( + &config, + known_validators_loaded.clone(), + db_request_receiver, + db_batch_request_receiver, + local_cache.clone(), + ) + .await?; - let event_channel = crossbeam_channel::bounded(10_000); let relay_network_api = RelayNetworkManager::new(config.relay_network.clone(), relay_signing_context.clone()); - let (top_bid_tx, _) = tokio::sync::broadcast::channel(100); - config.router_config.validate_bid_sorter()?; let current_slot_info = start_housekeeper( @@ -104,7 +155,7 @@ async fn run(instance_id: String, config: RelayConfig, keypair: BlsKeypair) -> e &config, beacon_client.clone(), chain_info.clone(), - event_channel.0.clone(), + event_channel, relay_network_api.clone(), ) .await @@ -116,6 +167,7 @@ async fn run(instance_id: String, config: RelayConfig, keypair: BlsKeypair) -> e tokio::spawn(start_api_service::( config.clone(), + pg_db.clone(), db.clone(), local_cache, current_slot_info, @@ -123,18 +175,18 @@ async fn run(instance_id: String, config: RelayConfig, keypair: BlsKeypair) -> e relay_signing_context, beacon_client, Arc::new(DefaultApiProvider {}), - DefaultBidAdjustor {}, known_validators_loaded, terminating.clone(), top_bid_tx, - event_channel, + auctioneer_handle.clone(), + registrations_handle.clone(), relay_network_api.api(), )); let termination_grace_period = config.router_config.shutdown_delay_ms; if config.website.enabled { - tokio::spawn(WebsiteService::run_loop(config, db)); + tokio::spawn(WebsiteService::run_loop(config, pg_db)); } // wait for SIGTERM or SIGINT @@ -157,14 +209,3 @@ async fn run(instance_id: String, config: RelayConfig, keypair: BlsKeypair) -> e Ok(()) } - -pub async fn start_auctioneer(db: Arc) -> eyre::Result> { - let auctioneer = Arc::new(LocalCache::new()); - let auctioneer_clone = auctioneer.clone(); - tokio::spawn(async move { - let builder_infos = db.get_all_builder_infos().await.expect("failed to load builder infos"); - auctioneer_clone.update_builder_infos(&builder_infos, true); - }); - - Ok(auctioneer) -} diff --git a/crates/simulator/src/block_merging/mod.rs b/crates/simulator/src/block_merging/mod.rs index bb726387..98e2a6ab 100644 --- a/crates/simulator/src/block_merging/mod.rs +++ b/crates/simulator/src/block_merging/mod.rs @@ -40,7 +40,7 @@ use revm::{ DatabaseCommit, DatabaseRef, database::{CacheDB, State}, }; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, info, warn}; pub(crate) use crate::block_merging::api::{BlockMergingApi, BlockMergingApiServer}; use crate::{ diff --git a/crates/simulator/src/validation/mod.rs b/crates/simulator/src/validation/mod.rs index c2ea4162..476dcbf3 100644 --- a/crates/simulator/src/validation/mod.rs +++ b/crates/simulator/src/validation/mod.rs @@ -302,6 +302,7 @@ impl ValidationApi { message: &BidTrace, ) -> Result<(), ValidationApiError> { if header.hash() != message.block_hash { + tracing::error!("Block hash mismatch: {message:?}, {header:?}"); Err(ValidationApiError::BlockHashMismatch(GotExpected { got: message.block_hash, expected: header.hash(), diff --git a/crates/types/src/bid_submission.rs b/crates/types/src/bid_submission.rs index 7d90731a..dea19d8e 100644 --- a/crates/types/src/bid_submission.rs +++ b/crates/types/src/bid_submission.rs @@ -10,7 +10,7 @@ use tree_hash_derive::TreeHash; use crate::{ BlobsBundle, BlobsBundleV1, BlobsBundleV2, BlobsError, Bloom, BlsPublicKey, BlsPublicKeyBytes, - BlsSignature, BlsSignatureBytes, ExecutionPayload, ExtraData, PayloadAndBlobsRef, SszError, + BlsSignature, BlsSignatureBytes, ExecutionPayload, ExtraData, PayloadAndBlobs, SszError, bid_adjustment_data::BidAdjustmentData, error::SigError, fields::ExecutionRequests, }; @@ -325,6 +325,15 @@ impl SignedBidSubmission { } } + pub fn message_mut(&mut self) -> &mut BidTrace { + match self { + SignedBidSubmission::Electra(signed_bid_submission) => { + &mut signed_bid_submission.message + } + SignedBidSubmission::Fulu(signed_bid_submission) => &mut signed_bid_submission.message, + } + } + pub fn execution_payload_ref(&self) -> &ExecutionPayload { match self { SignedBidSubmission::Electra(signed_bid_submission) => { @@ -336,26 +345,37 @@ impl SignedBidSubmission { } } - pub fn payload_and_blobs_ref(&self) -> PayloadAndBlobsRef<'_> { + pub fn execution_payload_make_mut(&mut self) -> &mut ExecutionPayload { + match self { + SignedBidSubmission::Electra(signed_bid_submission) => { + Arc::make_mut(&mut signed_bid_submission.execution_payload) + } + SignedBidSubmission::Fulu(signed_bid_submission) => { + Arc::make_mut(&mut signed_bid_submission.execution_payload) + } + } + } + + pub fn payload_and_blobs(&self) -> PayloadAndBlobs { match self { - SignedBidSubmission::Electra(signed_bid_submission) => PayloadAndBlobsRef { - execution_payload: self.execution_payload_ref(), - blobs_bundle: &signed_bid_submission.blobs_bundle, + SignedBidSubmission::Electra(sub) => PayloadAndBlobs { + execution_payload: sub.execution_payload.clone(), + blobs_bundle: sub.blobs_bundle.clone(), }, - SignedBidSubmission::Fulu(signed_bid_submission) => PayloadAndBlobsRef { - execution_payload: self.execution_payload_ref(), - blobs_bundle: &signed_bid_submission.blobs_bundle, + SignedBidSubmission::Fulu(sub) => PayloadAndBlobs { + execution_payload: sub.execution_payload.clone(), + blobs_bundle: sub.blobs_bundle.clone(), }, } } - pub fn execution_requests(&self) -> Arc { + pub fn execution_requests_ref(&self) -> &Arc { match self { SignedBidSubmission::Electra(signed_bid_submission) => { - signed_bid_submission.execution_requests.clone() + &signed_bid_submission.execution_requests } SignedBidSubmission::Fulu(signed_bid_submission) => { - signed_bid_submission.execution_requests.clone() + &signed_bid_submission.execution_requests } } } @@ -486,12 +506,14 @@ impl SignedBidSubmission { } } - pub fn value(&self) -> U256 { + pub fn value(&self) -> &U256 { match self { SignedBidSubmission::Electra(signed_bid_submission) => { - signed_bid_submission.message.value + &signed_bid_submission.message.value + } + SignedBidSubmission::Fulu(signed_bid_submission) => { + &signed_bid_submission.message.value } - SignedBidSubmission::Fulu(signed_bid_submission) => signed_bid_submission.message.value, } } diff --git a/crates/types/src/blobs.rs b/crates/types/src/blobs.rs index 883fc130..d1555e52 100644 --- a/crates/types/src/blobs.rs +++ b/crates/types/src/blobs.rs @@ -381,11 +381,10 @@ impl Default for BlobsBundle { } /// Similar to lighthouse but using our BlobsBundleV1 -// TODO: arc the fields #[derive(Clone, PartialEq, Debug, Serialize, Deserialize, Encode)] pub struct PayloadAndBlobs { - pub execution_payload: ExecutionPayload, - pub blobs_bundle: BlobsBundle, + pub execution_payload: Arc, + pub blobs_bundle: Arc, } // From lighthouse @@ -397,10 +396,10 @@ impl ForkVersionDecode for PayloadAndBlobs { let mut decoder = builder.build()?; if fork_name.deneb_enabled() { - let execution_payload = decoder.decode_next_with(|bytes| { + let execution_payload = Arc::new(decoder.decode_next_with(|bytes| { ExecutionPayload::from_ssz_bytes_by_fork(bytes, fork_name) - })?; - let blobs_bundle = decoder.decode_next()?; + })?); + let blobs_bundle = Arc::new(decoder.decode_next()?); Ok(Self { execution_payload, blobs_bundle }) } else { Err(DecodeError::BytesInvalid(format!( @@ -410,34 +409,6 @@ impl ForkVersionDecode for PayloadAndBlobs { } } -// BlobsBundleV2 used in fulu is the same as V1 except the number of proofs is much larger -// as each proof is for a cell in the blob, not one per blob. -// But as we use a Vec, we can use the same struct here and just validate the lengths differently if -// needed. -#[derive(Clone, PartialEq, Debug, Serialize, Encode)] -pub struct PayloadAndBlobsRef<'a> { - pub execution_payload: &'a ExecutionPayload, - pub blobs_bundle: &'a BlobsBundle, -} - -impl<'a> From<&'a PayloadAndBlobs> for PayloadAndBlobsRef<'a> { - fn from(payload_and_blobs: &'a PayloadAndBlobs) -> Self { - PayloadAndBlobsRef { - execution_payload: &payload_and_blobs.execution_payload, - blobs_bundle: &payload_and_blobs.blobs_bundle, - } - } -} - -impl PayloadAndBlobsRef<'_> { - /// Clone out an owned `PayloadAndBlobs` - pub fn to_owned(&self) -> PayloadAndBlobs { - let execution_payload = self.execution_payload.clone(); - let blobs_bundle = self.blobs_bundle.clone(); - PayloadAndBlobs { execution_payload, blobs_bundle } - } -} - // From lighthouse, replacing the blobs and kzg proofs #[derive(Debug, Clone, Serialize, Deserialize, Encode)] pub struct SignedBlockContents { @@ -487,7 +458,7 @@ mod tests { fn test_payload_and_blobs_equivalence() { let data_json = include_str!("testdata/signed-bid-submission-electra.json"); let signed_bid = test_encode_decode_json::(data_json); - let ex = signed_bid.payload_and_blobs_ref().to_owned(); + let ex = signed_bid.payload_and_blobs(); let data_ssz = ex.as_ssz_bytes(); diff --git a/crates/types/src/hydration.rs b/crates/types/src/hydration.rs index 2f227f0d..da7dc6be 100644 --- a/crates/types/src/hydration.rs +++ b/crates/types/src/hydration.rs @@ -7,6 +7,7 @@ use rustc_hash::{FxHashMap, FxHasher}; use serde::{Deserialize, Serialize}; use ssz::{Decode, DecodeError}; use ssz_derive::{Decode, Encode}; +use tracing::trace; use tree_hash::TreeHash; use crate::{ @@ -217,7 +218,7 @@ impl DehydratedBidSubmissionElectra { let bytes = tx.as_ref().try_into().unwrap(); let hash = u64::from_le_bytes(bytes); let Some(cached_tx) = order_cache.transactions.get(&hash) else { - last_err = Err(HydrationError::UnknownTxHash { index }); + last_err = Err(HydrationError::UnknownTxHash { index, hash }); continue; }; @@ -238,6 +239,7 @@ impl DehydratedBidSubmissionElectra { hasher.write(last_slice); let hash = hasher.finish(); order_cache.transactions.insert(hash, tx.clone()); + trace!("Inserted tx into cache: index {}, hash {}", index, hash); }; } @@ -310,7 +312,7 @@ impl DehydratedBidSubmissionFulu { let bytes = tx.as_ref().try_into().unwrap(); let hash = u64::from_le_bytes(bytes); let Some(cached_tx) = order_cache.transactions.get(&hash) else { - last_err = Err(HydrationError::UnknownTxHash { index }); + last_err = Err(HydrationError::UnknownTxHash { index, hash }); continue; }; @@ -331,6 +333,7 @@ impl DehydratedBidSubmissionFulu { hasher.write(last_slice); let hash = hasher.finish(); order_cache.transactions.insert(hash, tx.clone()); + trace!("Inserted tx into cache: index {}, hash {}", index, hash); }; } @@ -436,8 +439,8 @@ impl Default for HydrationCache { #[derive(Debug, thiserror::Error)] pub enum HydrationError { - #[error("unkown tx: index {index}")] - UnknownTxHash { index: usize }, + #[error("unkown tx: index {index}, hash {hash}")] + UnknownTxHash { index: usize, hash: u64 }, #[error("invalid tx bytes: length {length}, index {index}")] InvalidTxLength { length: usize, index: usize }, diff --git a/crates/types/src/lib.rs b/crates/types/src/lib.rs index f99aedc2..f0e1835b 100644 --- a/crates/types/src/lib.rs +++ b/crates/types/src/lib.rs @@ -12,8 +12,6 @@ mod test_utils; mod utils; mod validator; -use std::sync::Arc; - pub use bid_adjustment_data::{BidAdjData, BidAdjustmentData}; pub use bid_submission::*; pub use blobs::*; @@ -85,7 +83,7 @@ pub type BlindedPayload = lh_types::payload::BlindedPayload; pub type BlindedPayloadRef<'a> = lh_types::payload::BlindedPayloadRef<'a, MainnetEthSpec>; /// Response object of POST `/eth/v1/builder/blinded_blocks` -pub type GetPayloadResponse = lh_types::ForkVersionedResponse>; +pub type GetPayloadResponse = lh_types::ForkVersionedResponse; // Registration pub type ValidatorRegistration = validator::ValidatorRegistrationData; From 05c47c0e121263b94a662b7c244ca721d5240e15 Mon Sep 17 00:00:00 2001 From: owen Date: Tue, 13 Jan 2026 19:29:02 +0000 Subject: [PATCH 2/2] fixup after merging develop --- crates/common/src/local_cache.rs | 19 +++- crates/relay/src/auctioneer/block_merger.rs | 12 +- crates/relay/src/auctioneer/context.rs | 104 +++--------------- crates/relay/src/auctioneer/get_header.rs | 4 +- crates/relay/src/auctioneer/mod.rs | 3 - crates/relay/src/auctioneer/submit_block.rs | 13 +-- crates/relay/src/database/handle.rs | 10 +- .../database/postgres/postgres_db_service.rs | 71 ++---------- crates/relay/src/housekeeper/housekeeper.rs | 50 +-------- crates/relay/src/lib.rs | 2 +- crates/simulator/src/block_merging/mod.rs | 5 +- 11 files changed, 61 insertions(+), 232 deletions(-) diff --git a/crates/common/src/local_cache.rs b/crates/common/src/local_cache.rs index c40abe29..092b9327 100644 --- a/crates/common/src/local_cache.rs +++ b/crates/common/src/local_cache.rs @@ -30,6 +30,7 @@ use crate::{ const ESTIMATED_TRUSTED_PROPOSERS: usize = 200_000; const ESTIMATED_BUILDER_INFOS_UPPER_BOUND: usize = 1000; const MAX_PRIMEV_PROPOSERS: usize = 64; +const VALIDATOR_REGISTRATION_UPDATE_INTERVAL: u64 = 60 * 60; // 1 hour in seconds #[derive(Debug, thiserror::Error)] pub enum AuctioneerError { @@ -117,12 +118,15 @@ impl LocalCache { let trusted_proposers = Arc::new(DashMap::with_capacity(ESTIMATED_TRUSTED_PROPOSERS)); let primev_proposers = Arc::new(DashSet::with_capacity(MAX_PRIMEV_PROPOSERS)); let kill_switch = Arc::new(AtomicBool::new(false)); - let proposer_duties = Arc::new(RwLock::new(Vec::new())); + let proposer_duties = Arc::new(RwLock::new(Vec::with_capacity(1000))); let merged_blocks = Arc::new(DashMap::with_capacity(1000)); - let validator_registration_cache = Arc::new(DashMap::new()); - let pending_validator_registrations = Arc::new(DashSet::new()); - let known_validators_cache = Arc::new(RwLock::new(FxHashSet::default())); - let validator_pool_cache = Arc::new(DashMap::new()); + let validator_registration_cache = Arc::new(DashMap::with_capacity(1_8000_000)); + let pending_validator_registrations = Arc::new(DashSet::with_capacity(20_000)); + let known_validators_cache = Arc::new(RwLock::new(FxHashSet::with_capacity_and_hasher( + 1_200_000, + Default::default(), + ))); + let validator_pool_cache = Arc::new(DashMap::with_capacity(1000)); let adjustments_enabled = Arc::new(AtomicBool::new(false)); let adjustments_failsafe_trigger = Arc::new(AtomicBool::new(false)); @@ -267,7 +271,10 @@ impl LocalCache { if let Some(existing_entry) = self.validator_registration_cache.get(®istration.message.pubkey) && existing_entry.registration_info.registration.message.timestamp >= - registration.message.timestamp.saturating_sub(60 * 60) && + registration + .message + .timestamp + .saturating_sub(VALIDATOR_REGISTRATION_UPDATE_INTERVAL) && existing_entry.registration_info.registration.message.fee_recipient == registration.message.fee_recipient && existing_entry.registration_info.registration.message.gas_limit == diff --git a/crates/relay/src/auctioneer/block_merger.rs b/crates/relay/src/auctioneer/block_merger.rs index 66a748d8..ead3fdbb 100644 --- a/crates/relay/src/auctioneer/block_merger.rs +++ b/crates/relay/src/auctioneer/block_merger.rs @@ -674,15 +674,13 @@ fn get_tx_versioned_hashes(mut raw_tx: &[u8]) -> Vec { Err(err) => { warn!(?err, "failed to decode transaction for versioned hash extraction"); return vec![]; - }, + } }; match tx { - TxEnvelope::Eip4844(tx_eip4844) => { - match tx_eip4844.blob_versioned_hashes() { - Some(vhs) => vhs.to_vec(), - None => vec![], - } - } + TxEnvelope::Eip4844(tx_eip4844) => match tx_eip4844.blob_versioned_hashes() { + Some(vhs) => vhs.to_vec(), + None => vec![], + }, _ => vec![], } } diff --git a/crates/relay/src/auctioneer/context.rs b/crates/relay/src/auctioneer/context.rs index 17fb8036..827a3c3b 100644 --- a/crates/relay/src/auctioneer/context.rs +++ b/crates/relay/src/auctioneer/context.rs @@ -9,7 +9,7 @@ use std::{ use alloy_primitives::{B256, U256}; use helix_common::{ - BuilderInfo, RelayConfig, local_cache::LocalCache, metrics::SimulatorMetrics, spawn_tracked, utils::alert_discord, + BuilderInfo, RelayConfig, local_cache::LocalCache, metrics::SimulatorMetrics, spawn_tracked, }; use helix_types::{BlsPublicKeyBytes, HydrationCache, Slot, SubmissionVersion}; use rustc_hash::FxHashMap; @@ -49,8 +49,6 @@ pub struct Context { pub db: DbHandle, pub slot_context: SlotContext, pub bid_adjustor: B, - pub adjustments_enabled: Arc, - pub adjustments_failsafe_trigger: Arc, } const EXPECTED_PAYLOADS_PER_SLOT: usize = 5000; @@ -99,28 +97,9 @@ impl Context { block_merger, }; - let adjustments_enabled = Arc::new(AtomicBool::new(false)); - let adjustments_failsafe_trigger = Arc::new(AtomicBool::new(false)); + Self::spawn_adjustments_dry_run_task(auctioneer, cache.adjustments_enabled.clone()); - Self::spawn_check_flag_task( - db.clone(), - adjustments_enabled.clone(), - adjustments_failsafe_trigger.clone(), - ); - - Self::spawn_adjustments_dry_run_task(auctioneer, adjustments_enabled.clone()); - - Self { - chain_info, - cache, - unknown_builder_info, - slot_context, - db, - config, - bid_adjustor, - adjustments_enabled, - adjustments_failsafe_trigger, - } + Self { cache, unknown_builder_info, slot_context, db, config, bid_adjustor } } pub fn builder_info(&self, builder: &BlsPublicKeyBytes) -> BuilderInfo { @@ -154,26 +133,16 @@ impl Context { result.submission.execution_payload_ref().to_header(None, None) ); - if !self.adjustments_enabled.load(Ordering::Relaxed) { + if !self.cache.adjustments_enabled.load(Ordering::Relaxed) { warn!(%block_hash, "adjustments already disabled"); } else { SimulatorMetrics::disable_adjustments(); - self.adjustments_enabled.store(false, Ordering::Relaxed); - - let db = self.db.clone(); - let failsafe_trigger = self.adjustments_failsafe_trigger.clone(); - let adjustments_enabled = self.adjustments_enabled.clone(); - spawn_tracked!(async move { - if let Err(err) = db.disable_adjustments().await { - failsafe_trigger.store(true, Ordering::Relaxed); - adjustments_enabled.store(false, Ordering::Relaxed); - error!(%block_hash, %err, "failed to disable adjustments in database, pulling the failsafe trigger"); - alert_discord(&format!( - "{} {} failed to disable adjustments in database, pulling the failsafe trigger", - err, block_hash - )); - } - }); + self.cache.adjustments_enabled.store(false, Ordering::Relaxed); + self.db.disable_adjustments( + block_hash, + self.cache.adjustments_failsafe_trigger.clone(), + self.cache.adjustments_enabled.clone(), + ); } } else if self.cache.demote_builder(&builder) { warn!(%builder, %block_hash, %err, "Block simulation resulted in an error. Demoting builder..."); @@ -184,20 +153,13 @@ impl Context { let bid_slot = result.submission.slot(); let failsafe_triggered = self.sim_manager.failsafe_triggered.clone(); - let db = self.db.clone(); - spawn_tracked!(async move { - if let Err(err) = db - .db_demote_builder(bid_slot.as_u64(), &builder, &block_hash, reason) - .await - { - failsafe_triggered.store(true, Ordering::Relaxed); - error!(%builder, %err, %block_hash, "failed to demote builder in database! Pausing all optmistic submissions"); - alert_discord(&format!( - "{} {} {} failed to demote builder in database! Pausing all optmistic submissions", - builder, err, block_hash - )); - } - }); + self.db.db_demote_builder( + bid_slot.as_u64(), + builder, + block_hash, + reason, + failsafe_triggered, + ); } else { warn!(%err, %builder, %block_hash, "builder already demoted, skipping demotion"); } @@ -271,38 +233,6 @@ impl Context { self.payloads.insert(block_hash, payload); } - fn spawn_check_flag_task( - db: Arc, - flag: Arc, - failsafe_triggered: Arc, - ) { - spawn_tracked!(async move { - let mut interval = tokio::time::interval(DB_CHECK_INTERVAL); - loop { - interval.tick().await; - - if failsafe_triggered.load(Ordering::Relaxed) { - flag.store(false, Ordering::Relaxed); - return; - } - - match db.check_adjustments_enabled().await { - Ok(value) => { - let previous = flag.swap(value, Ordering::Relaxed); - if previous != value { - tracing::info!( - "adjustments enabled flag changed from {} to {}", - previous, - value - ); - } - } - Err(e) => tracing::error!("failed to check adjustments_enabled flag: {}", e), - } - } - }); - } - fn spawn_adjustments_dry_run_task( auctioneer: crossbeam_channel::Sender, adjustments_enabled: Arc, diff --git a/crates/relay/src/auctioneer/get_header.rs b/crates/relay/src/auctioneer/get_header.rs index 7f656427..0505fb82 100644 --- a/crates/relay/src/auctioneer/get_header.rs +++ b/crates/relay/src/auctioneer/get_header.rs @@ -1,7 +1,5 @@ use std::sync::atomic::Ordering; -use std::sync::atomic::Ordering; - use alloy_primitives::B256; use helix_common::api::proposer_api::GetHeaderParams; use tokio::sync::oneshot; @@ -42,7 +40,7 @@ impl Context { return Ok(merged_bid); }; - if self.adjustments_enabled.load(Ordering::Relaxed) && + if self.cache.adjustments_enabled.load(Ordering::Relaxed) && let Some((adjusted_bid, sim_request)) = self.bid_adjustor.try_apply_adjustments(original_bid, slot_data, false) { diff --git a/crates/relay/src/auctioneer/mod.rs b/crates/relay/src/auctioneer/mod.rs index 7f2b5b47..e284b2bf 100644 --- a/crates/relay/src/auctioneer/mod.rs +++ b/crates/relay/src/auctioneer/mod.rs @@ -37,9 +37,6 @@ use tracing::{debug, error, info, info_span, trace, warn}; pub use types::{ Event, GetPayloadResultData, PayloadBidData, PayloadEntry, SlotData, SubmissionPayload, }; -pub use types::{ - Event, GetPayloadResultData, PayloadBidData, PayloadEntry, SlotData, SubmissionPayload, -}; use worker::{RegWorker, SubWorker}; pub use crate::auctioneer::{ diff --git a/crates/relay/src/auctioneer/submit_block.rs b/crates/relay/src/auctioneer/submit_block.rs index cf1c89ff..d3719b87 100644 --- a/crates/relay/src/auctioneer/submit_block.rs +++ b/crates/relay/src/auctioneer/submit_block.rs @@ -3,14 +3,14 @@ use std::time::Instant; use alloy_primitives::{Address, B256, U256}; use helix_common::{ self, BuilderInfo, SubmissionTrace, bid_submission::OptimisticVersion, - metrics::HYDRATION_CACHE_HITS, record_submission_step, spawn_tracked, + metrics::HYDRATION_CACHE_HITS, record_submission_step, }; use helix_types::{ BidAdjustmentData, BlockValidationError, MergeableOrdersWithPref, SignedBidSubmission, SubmissionVersion, }; use tokio::sync::oneshot; -use tracing::{error, trace, warn}; +use tracing::{trace, warn}; use crate::{ api::builder::error::BuilderApiError, @@ -249,14 +249,7 @@ impl Context { let trace_clone = req.trace.clone(); let opt_version = req.optimistic_version(); - let db = self.db.clone(); - spawn_tracked!(async move { - if let Err(err) = - db.store_block_submission(sub_clone, trace_clone, opt_version, is_adjusted).await - { - error!(%err, "failed to store block submission") - } - }); + self.db.store_block_submission(sub_clone, trace_clone, opt_version, is_adjusted); self.sim_manager.handle_sim_request(req, fast_track); } diff --git a/crates/relay/src/database/handle.rs b/crates/relay/src/database/handle.rs index 2e747893..94b3f099 100644 --- a/crates/relay/src/database/handle.rs +++ b/crates/relay/src/database/handle.rs @@ -14,7 +14,7 @@ use helix_common::{ bid_submission::OptimisticVersion, utils::alert_discord, }; -use helix_types::{BlsPublicKeyBytes, SignedBidSubmission}; +use helix_types::{BlsPublicKeyBytes, MergedBlock, SignedBidSubmission}; use tracing::error; use crate::database::{ @@ -96,11 +96,13 @@ impl DbHandle { submission: SignedBidSubmission, trace: SubmissionTrace, optimistic_version: OptimisticVersion, + is_adjusted: bool, ) { if let Err(err) = self.batch_sender.send(PendingBlockSubmissionValue { submission, trace, optimistic_version, + is_adjusted, }) { error!(%err, "failed to store block submission"); } @@ -203,4 +205,10 @@ impl DbHandle { )); } } + + pub fn save_merged_blocks(&self, blocks: Vec) { + if let Err(err) = self.sender.try_send(DbRequest::SaveMergedBlocks { blocks }) { + error!(%err, "failed to send SaveMergedBlocks request"); + } + } } diff --git a/crates/relay/src/database/postgres/postgres_db_service.rs b/crates/relay/src/database/postgres/postgres_db_service.rs index 28234dba..3d89d1bc 100644 --- a/crates/relay/src/database/postgres/postgres_db_service.rs +++ b/crates/relay/src/database/postgres/postgres_db_service.rs @@ -26,11 +26,8 @@ use helix_common::{ metrics::DbMetricRecord, utils::{alert_discord, utcnow_ms}, }; -use helix_types::{ - BlsPublicKeyBytes, MergedBlock, SignedBidSubmission, Slot, -}; +use helix_types::{BlsPublicKeyBytes, MergedBlock, SignedBidSubmission, Slot}; use rustc_hash::FxHashSet; -use tokio::sync::oneshot; use tokio_postgres::{NoTls, types::ToSql}; use tracing::{error, info, instrument, warn}; @@ -48,14 +45,6 @@ use crate::database::{ }; pub enum DbRequest { - GetValidatorRegistration { - pub_key: BlsPublicKeyBytes, - response: oneshot::Sender>, - }, - GetValidatorRegistrationsForPubKeys { - pub_keys: Vec, - response: oneshot::Sender, DatabaseError>>, - }, SaveTooLateGetPayload { slot: u64, proposer_pub_key: BlsPublicKeyBytes, @@ -76,16 +65,6 @@ pub enum DbRequest { reason: String, failsafe_triggered: Arc, }, - GetBids { - filters: BidFilters, - validator_preferences: Arc, - response: oneshot::Sender, DatabaseError>>, - }, - GetDeliveredPayloads { - filters: BidFilters, - validator_preferences: Arc, - response: oneshot::Sender, DatabaseError>>, - }, SaveGetHeaderCall { params: GetHeaderParams, best_block_hash: B256, @@ -104,26 +83,15 @@ pub enum DbRequest { block_hash: B256, trace: GossipedPayloadTrace, }, - GetTrustedProposers { - response: oneshot::Sender, DatabaseError>>, - }, SaveInclusionList { inclusion_list: InclusionListWithMetadata, slot: u64, block_parent_hash: B256, proposer_pubkey: BlsPublicKeyBytes, }, - GetBlockAdjustmentsForSlot { - slot: Slot, - response: oneshot::Sender, DatabaseError>>, - }, SaveBlockAdjustmentsData { entry: DataAdjustmentsEntry, }, - GetProposerHeaderDelivered { - params: ProposerHeaderDeliveredParams, - response: oneshot::Sender, DatabaseError>>, - }, SetKnownValidators { known_validators: Vec, }, @@ -135,6 +103,9 @@ pub enum DbRequest { failsafe_trigger: Arc, adjustments_enabled: Arc, }, + SaveMergedBlocks { + blocks: Vec, + }, } pub struct PendingBlockSubmissionValue { @@ -515,15 +486,6 @@ impl PostgresDatabaseService { async fn handle_db_request(&self, request: DbRequest) { match request { - DbRequest::GetValidatorRegistration { pub_key, response } => { - let result = self.get_validator_registration(&pub_key).await; - let _ = response.send(result); - } - DbRequest::GetValidatorRegistrationsForPubKeys { pub_keys, response } => { - let pub_key_refs: Vec<&BlsPublicKeyBytes> = pub_keys.iter().collect(); - let result = self.get_validator_registrations_for_pub_keys(&pub_key_refs).await; - let _ = response.send(result); - } DbRequest::SaveTooLateGetPayload { slot, proposer_pub_key, @@ -572,14 +534,6 @@ impl PostgresDatabaseService { )); } } - DbRequest::GetBids { filters, validator_preferences, response } => { - let result = self.get_bids(&filters, validator_preferences).await; - let _ = response.send(result); - } - DbRequest::GetDeliveredPayloads { filters, validator_preferences, response } => { - let result = self.get_delivered_payloads(&filters, validator_preferences).await; - let _ = response.send(result); - } DbRequest::SaveGetHeaderCall { params, best_block_hash, @@ -613,10 +567,6 @@ impl PostgresDatabaseService { error!(%err, "failed to store gossiped payload trace") } } - DbRequest::GetTrustedProposers { response } => { - let result = self.get_trusted_proposers().await; - let _ = response.send(result); - } DbRequest::SaveInclusionList { inclusion_list, slot, @@ -635,19 +585,11 @@ impl PostgresDatabaseService { error!(%slot, "failed to save inclusion list Errors: {:?}", err); } } - DbRequest::GetBlockAdjustmentsForSlot { slot, response } => { - let result = self.get_block_adjustments_for_slot(slot).await; - let _ = response.send(result); - } DbRequest::SaveBlockAdjustmentsData { entry } => { if let Err(err) = self.save_block_adjustments_data(entry).await { error!(%err, "failed to save block adjustments data"); } } - DbRequest::GetProposerHeaderDelivered { params, response } => { - let result = self.get_proposer_header_delivered(¶ms).await; - let _ = response.send(result); - } DbRequest::SetKnownValidators { known_validators } => { if let Err(err) = self.set_known_validators(known_validators).await { error!(%err, "failed to set known validators"); @@ -669,6 +611,11 @@ impl PostgresDatabaseService { )); } } + DbRequest::SaveMergedBlocks { blocks } => { + if let Err(err) = self.save_merged_blocks(&blocks).await { + error!(%err, "failed to save merged blocks"); + } + } } } diff --git a/crates/relay/src/housekeeper/housekeeper.rs b/crates/relay/src/housekeeper/housekeeper.rs index a3b257ac..2dfabc58 100644 --- a/crates/relay/src/housekeeper/housekeeper.rs +++ b/crates/relay/src/housekeeper/housekeeper.rs @@ -227,25 +227,6 @@ impl Housekeeper { } } - // builder info updated every slot - let housekeeper = self.clone(); - task::spawn( - file!(), - line!(), - async move { - let Ok(_guard) = housekeeper.slots.updating_builder_infos.try_lock() else { - warn!("builder info update already in progress"); - return; - }; - let start = Instant::now(); - if let Err(err) = housekeeper.sync_builder_info_changes().await { - error!(%err, "failed to sync builder info changes"); - } - info!(duration = ?start.elapsed(), "sync builder info task completed"); - } - .in_current_span(), - ); - // save merged blocks let housekeeper = self.clone(); spawn_tracked!(async move { @@ -255,39 +236,10 @@ impl Housekeeper { }; let start = Instant::now(); let merged_blocks = housekeeper.auctioneer.get_merged_blocks(); - if let Err(err) = housekeeper.db.save_merged_blocks(&merged_blocks).await { - error!(%err, "failed to save merged blocks"); - } + housekeeper.db.save_merged_blocks(merged_blocks); housekeeper.auctioneer.clear_merged_blocks(); info!(duration = ?start.elapsed(), "save merged blocks task completed"); }); - - // trusted proposers - if self.should_update_trusted_proposers(head_slot.as_u64()) { - if is_local_dev() { - warn!("skipping refresh of trusted proposers") - } else { - let housekeeper = self.clone(); - task::spawn( - file!(), - line!(), - async move { - let Ok(_guard) = housekeeper.slots.updating_trusted_proposers.try_lock() else { - warn!("trusted proposer update already in progress"); - return; - }; - let start = Instant::now(); - if let Err(err) = housekeeper.update_trusted_proposers().await { - error!(%err, "failed to update trusted proposers"); - } else { - housekeeper.slots.update_trusted_proposers(head_slot); - } - info!(duration = ?start.elapsed(), "update trusted proposers task completed"); - } - .in_current_span(), - ); - } - } } #[tracing::instrument(skip_all, name = "update_proposer_duties", fields(epoch = %epoch))] diff --git a/crates/relay/src/lib.rs b/crates/relay/src/lib.rs index db68ef5c..4e988581 100644 --- a/crates/relay/src/lib.rs +++ b/crates/relay/src/lib.rs @@ -12,7 +12,7 @@ pub use crate::{ auctioneer::{ AuctioneerHandle, Event, PayloadEntry, RegWorkerHandle, SimulatorClient, SimulatorRequest, SlotData, SubmissionPayload, spawn_workers, - , SimulatorRequest, SlotData, SubmissionPayload}, + }, beacon::start_beacon_client, database::{ DbRequest, PendingBlockSubmissionValue, handle::DbHandle, diff --git a/crates/simulator/src/block_merging/mod.rs b/crates/simulator/src/block_merging/mod.rs index 1fc69a15..e74a2cfe 100644 --- a/crates/simulator/src/block_merging/mod.rs +++ b/crates/simulator/src/block_merging/mod.rs @@ -48,9 +48,8 @@ use crate::{ block_merging::{ error::BlockMergingApiError, types::{ - BlockMergeRequestV1, BlockMergeResponseV1, DistributionConfig, - MergeableOrderBytes, MergeableOrderRecovered, RecoveredTx, SignedTx, SimulatedOrder, - SimulationError, + BlockMergeRequestV1, BlockMergeResponseV1, DistributionConfig, MergeableOrderBytes, + MergeableOrderRecovered, RecoveredTx, SignedTx, SimulatedOrder, SimulationError, }, }, common::CachedRethDb,