diff --git a/Cargo.lock b/Cargo.lock index 2346db1c..a22d83f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5004,6 +5004,7 @@ dependencies = [ "prometheus", "rand 0.9.2", "reqwest 0.11.27", + "rustc-hash", "serde", "serde_json", "serde_yaml", diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 3de10245..0eb9d73b 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -32,6 +32,7 @@ parking_lot.workspace = true prometheus.workspace = true rand.workspace = true reqwest.workspace = true +rustc-hash.workspace = true serde.workspace = true serde_json.workspace = true serde_yaml.workspace = true diff --git a/crates/common/src/local_cache.rs b/crates/common/src/local_cache.rs index c4e94345..092b9327 100644 --- a/crates/common/src/local_cache.rs +++ b/crates/common/src/local_cache.rs @@ -9,22 +9,28 @@ use axum::{ response::{IntoResponse, Response}, }; use dashmap::{DashMap, DashSet}; -use helix_types::{BlsPublicKeyBytes, CryptoError, MergedBlock}; +use helix_types::{BlsPublicKeyBytes, CryptoError, MergedBlock, SignedValidatorRegistration}; use http::HeaderValue; use parking_lot::RwLock; +use rustc_hash::FxHashSet; use tracing::error; use crate::{ - BuilderConfig, BuilderInfo, ProposerInfo, - api::builder_api::{ - BuilderGetValidatorsResponseEntry, InclusionListWithKey, InclusionListWithMetadata, - SlotCoordinate, + BuilderConfig, BuilderInfo, ProposerInfo, SignedValidatorRegistrationEntry, + api::{ + builder_api::{ + BuilderGetValidatorsResponseEntry, InclusionListWithKey, InclusionListWithMetadata, + SlotCoordinate, + }, + proposer_api::ValidatorRegistrationInfo, }, + chain_info::ChainInfo, }; const ESTIMATED_TRUSTED_PROPOSERS: usize = 200_000; const ESTIMATED_BUILDER_INFOS_UPPER_BOUND: usize = 1000; const MAX_PRIMEV_PROPOSERS: usize = 64; +const VALIDATOR_REGISTRATION_UPDATE_INTERVAL: u64 = 60 * 60; // 1 hour in seconds #[derive(Debug, thiserror::Error)] pub enum AuctioneerError { @@ -94,6 +100,14 @@ pub struct LocalCache { kill_switch: Arc, proposer_duties: Arc>>, merged_blocks: Arc>, + pub validator_registration_cache: + Arc>, + pub pending_validator_registrations: Arc>, + pub known_validators_cache: Arc>>, + pub validator_pool_cache: Arc>, + chain_info: Option>, + pub adjustments_enabled: Arc, + pub adjustments_failsafe_trigger: Arc, } impl LocalCache { @@ -104,8 +118,17 @@ impl LocalCache { let trusted_proposers = Arc::new(DashMap::with_capacity(ESTIMATED_TRUSTED_PROPOSERS)); let primev_proposers = Arc::new(DashSet::with_capacity(MAX_PRIMEV_PROPOSERS)); let kill_switch = Arc::new(AtomicBool::new(false)); - let proposer_duties = Arc::new(RwLock::new(Vec::new())); + let proposer_duties = Arc::new(RwLock::new(Vec::with_capacity(1000))); let merged_blocks = Arc::new(DashMap::with_capacity(1000)); + let validator_registration_cache = Arc::new(DashMap::with_capacity(1_8000_000)); + let pending_validator_registrations = Arc::new(DashSet::with_capacity(20_000)); + let known_validators_cache = Arc::new(RwLock::new(FxHashSet::with_capacity_and_hasher( + 1_200_000, + Default::default(), + ))); + let validator_pool_cache = Arc::new(DashMap::with_capacity(1000)); + let adjustments_enabled = Arc::new(AtomicBool::new(false)); + let adjustments_failsafe_trigger = Arc::new(AtomicBool::new(false)); Self { inclusion_list: Default::default(), @@ -116,6 +139,13 @@ impl LocalCache { kill_switch, proposer_duties, merged_blocks, + validator_registration_cache, + pending_validator_registrations, + known_validators_cache, + validator_pool_cache, + chain_info: None, + adjustments_enabled, + adjustments_failsafe_trigger, } } @@ -234,6 +264,74 @@ impl LocalCache { self.merged_blocks.get(block_hash).map(|b| b.value().clone()) } + pub fn is_registration_update_required( + &self, + registration: &SignedValidatorRegistration, + ) -> bool { + if let Some(existing_entry) = + self.validator_registration_cache.get(®istration.message.pubkey) && + existing_entry.registration_info.registration.message.timestamp >= + registration + .message + .timestamp + .saturating_sub(VALIDATOR_REGISTRATION_UPDATE_INTERVAL) && + existing_entry.registration_info.registration.message.fee_recipient == + registration.message.fee_recipient && + existing_entry.registration_info.registration.message.gas_limit == + registration.message.gas_limit + { + // do registration once per hour, unless fee recipient / gas limit has changed + + return false; + } + true + } + + /// Assume the entries are already validated + pub fn save_validator_registrations( + &self, + entries: impl Iterator, + pool_name: Option, + user_agent: Option, + ) { + for entry in entries { + self.pending_validator_registrations.insert(entry.registration.message.pubkey); + self.validator_registration_cache.insert( + entry.registration.message.pubkey, + SignedValidatorRegistrationEntry::new( + entry.clone(), + pool_name.clone(), + user_agent.clone(), + ), + ); + } + } + + pub fn get_validator_pool_name(&self, api_key: &str) -> Option { + self.validator_pool_cache.get(api_key).map(|v| v.value().clone()) + } + + pub fn get_chain_info(&self) -> Option> { + self.chain_info.clone() + } + + pub fn set_chain_info(&mut self, chain_info: Arc) { + self.chain_info = Some(chain_info); + } + + pub fn get_validator_registrations_for_pub_keys( + &self, + pub_keys: &[BlsPublicKeyBytes], + ) -> Vec { + let mut registrations = Vec::with_capacity(pub_keys.len()); + for pub_key in pub_keys { + if let Some(entry) = self.validator_registration_cache.get(pub_key) { + registrations.push(entry.clone()); + } + } + registrations + } + pub fn get_merged_blocks(&self) -> Vec { self.merged_blocks.iter().map(|b| b.value().clone()).collect() } diff --git a/crates/relay/src/api/builder/api.rs b/crates/relay/src/api/builder/api.rs index b87bf00a..40104026 100644 --- a/crates/relay/src/api/builder/api.rs +++ b/crates/relay/src/api/builder/api.rs @@ -4,8 +4,8 @@ use axum::{Extension, http::StatusCode, response::IntoResponse}; use helix_common::{RelayConfig, api::builder_api::TopBidUpdate, local_cache::LocalCache}; use crate::{ - api::Api, auctioneer::AuctioneerHandle, - database::postgres::postgres_db_service::PostgresDatabaseService, housekeeper::CurrentSlotInfo, + api::Api, auctioneer::AuctioneerHandle, database::handle::DbHandle, + housekeeper::CurrentSlotInfo, }; pub(crate) const MAX_PAYLOAD_LENGTH: usize = 1024 * 1024 * 20; // 20MB @@ -13,7 +13,7 @@ pub(crate) const MAX_PAYLOAD_LENGTH: usize = 1024 * 1024 * 20; // 20MB #[derive(Clone)] pub struct BuilderApi { pub local_cache: Arc, - pub db: Arc, + pub db: DbHandle, pub curr_slot_info: CurrentSlotInfo, pub relay_config: Arc, /// Subscriber for TopBid updates, SSZ encoded @@ -25,7 +25,7 @@ pub struct BuilderApi { impl BuilderApi { pub fn new( local_cache: Arc, - db: Arc, + db: DbHandle, relay_config: RelayConfig, curr_slot_info: CurrentSlotInfo, top_bid_tx: tokio::sync::broadcast::Sender, diff --git a/crates/relay/src/api/builder/gossip.rs b/crates/relay/src/api/builder/gossip.rs index aa8dc8f5..e9c0881d 100644 --- a/crates/relay/src/api/builder/gossip.rs +++ b/crates/relay/src/api/builder/gossip.rs @@ -1,4 +1,4 @@ -use helix_common::{GossipedPayloadTrace, spawn_tracked, utils::utcnow_ns}; +use helix_common::{GossipedPayloadTrace, utils::utcnow_ns}; use tracing::{debug, error}; use uuid::Uuid; @@ -20,11 +20,6 @@ impl BuilderApi { } // Save gossiped payload trace to db - let db = self.db.clone(); - spawn_tracked!(async move { - if let Err(err) = db.save_gossiped_payload_trace(block_hash, trace).await { - error!(%err, "failed to store gossiped payload trace") - } - }); + self.db.save_gossiped_payload_trace(block_hash, trace); } } diff --git a/crates/relay/src/api/proposer/get_header.rs b/crates/relay/src/api/proposer/get_header.rs index 1012ea5d..2279000e 100644 --- a/crates/relay/src/api/proposer/get_header.rs +++ b/crates/relay/src/api/proposer/get_header.rs @@ -115,24 +115,13 @@ impl ProposerApi { let bid_block_hash = *bid.block_hash(); let value = *bid.value(); - let db = proposer_api.db.clone(); - spawn_tracked!( - async move { - if let Err(err) = db - .save_get_header_call( - params, - bid_block_hash, - value, - trace, - is_mev_boost, - user_agent, - ) - .await - { - error!(%err, "error saving get header call to database"); - } - } - .in_current_span() + proposer_api.db.save_get_header_call( + params, + bid_block_hash, + value, + trace, + is_mev_boost, + user_agent, ); let fork = proposer_api.chain_info.current_fork_name(); diff --git a/crates/relay/src/api/proposer/get_payload.rs b/crates/relay/src/api/proposer/get_payload.rs index 88750d42..d658374a 100644 --- a/crates/relay/src/api/proposer/get_payload.rs +++ b/crates/relay/src/api/proposer/get_payload.rs @@ -124,13 +124,12 @@ impl ProposerApi { Ok(get_payload_response) => Ok(axum::Json(get_payload_response)), Err(err) => { // Save error to DB - if let Err(err) = proposer_api - .db - .save_failed_get_payload(slot.into(), block_hash, err.to_string(), trace) - .await - { - error!(err = ?err, "error saving failed get payload"); - } + proposer_api.db.save_failed_get_payload( + slot.into(), + block_hash, + err.to_string(), + trace, + ); Err(err) } @@ -220,13 +219,12 @@ impl ProposerApi { Ok(_get_payload_response) => Ok(StatusCode::ACCEPTED), Err(err) => { // Save error to DB - if let Err(err) = proposer_api - .db - .save_failed_get_payload(slot.into(), block_hash, err.to_string(), trace) - .await - { - error!(err = ?err, "error saving failed get payload"); - } + proposer_api.db.save_failed_get_payload( + slot.into(), + block_hash, + err.to_string(), + trace, + ); Err(err) } @@ -293,19 +291,13 @@ impl ProposerApi { warn!(error = %err, "get_payload was sent too late"); // Save too late request to db for debugging - if let Err(err) = self - .db - .save_too_late_get_payload( - (head_slot + 1).into(), - &proposer_public_key, - &block_hash, - trace.receive, - trace.payload_fetched, - ) - .await - { - error!(%err, "failed to save too late get payload"); - } + self.db.save_too_late_get_payload( + (head_slot + 1).into(), + proposer_public_key, + block_hash, + trace.receive, + trace.payload_fetched, + ); return Err(err); } @@ -441,7 +433,6 @@ impl ProposerApi { user_agent: Option, filtering: Filtering, ) { - let db = self.db.clone(); let trace = *trace; let params = SavePayloadParams { slot, @@ -454,11 +445,8 @@ impl ProposerApi { user_agent, filtering, }; - spawn_tracked!(async move { - if let Err(err) = db.save_delivered_payload(¶ms).await { - error!(%err, "error saving payload to database"); - } - }); + + self.db.save_delivered_payload(params); } pub(crate) async fn gossip_payload( diff --git a/crates/relay/src/api/proposer/mod.rs b/crates/relay/src/api/proposer/mod.rs index a5834ce4..541b6e73 100644 --- a/crates/relay/src/api/proposer/mod.rs +++ b/crates/relay/src/api/proposer/mod.rs @@ -19,7 +19,7 @@ use crate::{ api::{Api, router::Terminating}, auctioneer::{AuctioneerHandle, RegWorkerHandle}, beacon::multi_beacon_client::MultiBeaconClient, - database::postgres::postgres_db_service::PostgresDatabaseService, + database::handle::DbHandle, gossip::GrpcGossiperClientManager, housekeeper::CurrentSlotInfo, }; @@ -27,7 +27,7 @@ use crate::{ #[derive(Clone)] pub struct ProposerApi { pub local_cache: Arc, - pub db: Arc, + pub db: DbHandle, pub gossiper: Arc, pub multi_beacon_client: Arc, pub api_provider: Arc, @@ -45,7 +45,7 @@ pub struct ProposerApi { impl ProposerApi { pub fn new( local_cache: Arc, - db: Arc, + db: DbHandle, gossiper: Arc, api_provider: Arc, signing_context: Arc, diff --git a/crates/relay/src/api/proposer/register.rs b/crates/relay/src/api/proposer/register.rs index 4e1cedb5..96557fcd 100644 --- a/crates/relay/src/api/proposer/register.rs +++ b/crates/relay/src/api/proposer/register.rs @@ -58,7 +58,7 @@ impl ProposerApi { let api_key = headers.get(HEADER_API_KEY).and_then(|key| key.to_str().ok()); let pool_name = match api_key { - Some(api_key) => match proposer_api.db.get_validator_pool_name(api_key).await? { + Some(api_key) => match proposer_api.local_cache.get_validator_pool_name(api_key) { Some(pool_name) => Some(pool_name), None => { warn!("Invalid api key provided"); @@ -119,7 +119,7 @@ impl ProposerApi { let mut skipped_registrations = 0; let registrations_to_check: Vec<_> = { - let known_validators_guard = proposer_api.db.known_validators_cache().read(); + let known_validators_guard = proposer_api.local_cache.known_validators_cache.read(); registrations .into_iter() .filter(|reg| { @@ -131,7 +131,7 @@ impl ProposerApi { } }) .filter(|reg| { - if proposer_api.db.is_registration_update_required(reg) { + if proposer_api.local_cache.is_registration_update_required(reg) { true } else { skipped_registrations += 1; @@ -208,7 +208,11 @@ impl ProposerApi { } }); - proposer_api.db.save_validator_registrations(registrations_to_save, pool_name, user_agent); + proposer_api.local_cache.save_validator_registrations( + registrations_to_save, + pool_name, + user_agent, + ); info!( ?process_time, diff --git a/crates/relay/src/api/proposer/tests.rs b/crates/relay/src/api/proposer/tests.rs index 2a2e0f2a..c5e2969f 100644 --- a/crates/relay/src/api/proposer/tests.rs +++ b/crates/relay/src/api/proposer/tests.rs @@ -1123,36 +1123,4 @@ mod proposer_api_tests { let _ = tx.send(()); } - - #[tokio::test] - async fn test_get_validator_preferences() { - let (tx, http_config, _api, _curr_slot_info, _auctioneer) = start_api_server().await; - - let validator_pubkey = get_fixed_pubkey(0); - - let req_url = format!( - "{}{}{}/{}", - http_config.base_url(), - PATH_PROPOSER_API, - PATH_GET_VALIDATOR_PREFERENCES, - validator_pubkey.as_hex_string() - ); - - let resp = reqwest::Client::new() - .get(&req_url) - .header("X-Api-Key", "valid-api-key") - .send() - .await - .unwrap(); - - assert_eq!(resp.status(), StatusCode::OK); - - let body = resp.text().await.unwrap(); - let preferences: ValidatorPreferences = serde_json::from_str(&body).unwrap(); - - assert!(preferences.header_delay); - assert!(preferences.trusted_builders.is_none()); - - let _ = tx.send(()); - } } diff --git a/crates/relay/src/api/service.rs b/crates/relay/src/api/service.rs index 9d9465a8..3096ebcc 100644 --- a/crates/relay/src/api/service.rs +++ b/crates/relay/src/api/service.rs @@ -12,7 +12,7 @@ use moka::sync::Cache; use tracing::{error, info}; use crate::{ - BidAdjustor, + PostgresDatabaseService, api::{ Api, builder::api::BuilderApi, @@ -20,31 +20,31 @@ use crate::{ relay_data::{BidsCache, DataApi, DeliveredPayloadsCache, SelectiveExpiry}, router::build_router, }, - auctioneer::{Event, spawn_workers}, + auctioneer::{AuctioneerHandle, RegWorkerHandle}, beacon::multi_beacon_client::MultiBeaconClient, - database::postgres::postgres_db_service::PostgresDatabaseService, + database::handle::DbHandle, gossip::{GrpcGossiperClientManager, process_gossip_messages}, housekeeper::CurrentSlotInfo, network::api::RelayNetworkApi, }; pub(crate) const API_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); -pub(crate) const SIMULATOR_REQUEST_TIMEOUT: Duration = Duration::from_secs(20); pub async fn start_api_service( mut config: RelayConfig, db: Arc, + db_handle: DbHandle, local_cache: Arc, current_slot_info: CurrentSlotInfo, chain_info: Arc, relay_signing_context: Arc, multi_beacon_client: Arc, api_provider: Arc, - bid_adjustor: impl BidAdjustor, known_validators_loaded: Arc, terminating: Arc, top_bid_tx: tokio::sync::broadcast::Sender, - event_channel: (crossbeam_channel::Sender, crossbeam_channel::Receiver), + auctioneer_handle: AuctioneerHandle, + registrations_handle: RegWorkerHandle, relay_network_api: RelayNetworkApi, ) { let gossiper = Arc::new( @@ -57,20 +57,9 @@ pub async fn start_api_service( let (gossip_sender, gossip_receiver) = tokio::sync::mpsc::channel(10_000); - // spawn auctioneer - let (auctioneer_handle, registrations_handle) = spawn_workers( - Arc::unwrap_or_clone(chain_info.clone()), - config.clone(), - db.clone(), - Arc::unwrap_or_clone(local_cache.clone()), - bid_adjustor, - top_bid_tx.clone(), - event_channel, - ); - let builder_api = BuilderApi::::new( local_cache.clone(), - db.clone(), + db_handle.clone(), config.clone(), current_slot_info.clone(), top_bid_tx, @@ -83,7 +72,7 @@ pub async fn start_api_service( let proposer_api = Arc::new(ProposerApi::::new( local_cache, - db.clone(), + db_handle.clone(), gossiper.clone(), api_provider.clone(), relay_signing_context, diff --git a/crates/relay/src/auctioneer/block_merger.rs b/crates/relay/src/auctioneer/block_merger.rs index be86f548..ead3fdbb 100644 --- a/crates/relay/src/auctioneer/block_merger.rs +++ b/crates/relay/src/auctioneer/block_merger.rs @@ -8,8 +8,7 @@ use alloy_consensus::{Bytes48, Transaction, TxEip4844, TxEnvelope, TxType}; use alloy_primitives::{Address, B256, Bytes, U256, hex}; use alloy_rlp::Decodable; use helix_common::{ - RelayConfig, chain_info::ChainInfo, local_cache::LocalCache, metrics::MERGE_TRACE_LATENCY, - utils::utcnow_ms, + RelayConfig, local_cache::LocalCache, metrics::MERGE_TRACE_LATENCY, utils::utcnow_ms, }; use helix_types::{ BlobWithMetadata, BlobWithMetadataV1, BlobWithMetadataV2, BlobsBundle, BlobsBundleVersion, @@ -64,7 +63,6 @@ pub enum OrderValidationError { pub struct BlockMerger { curr_bid_slot: u64, config: RelayConfig, - chain_info: ChainInfo, local_cache: LocalCache, best_merged_block: Option, best_mergeable_orders: BestMergeableOrders, @@ -86,16 +84,10 @@ pub struct BlockMerger { } impl BlockMerger { - pub fn new( - curr_bid_slot: u64, - chain_info: ChainInfo, - local_cache: LocalCache, - config: RelayConfig, - ) -> Self { + pub fn new(curr_bid_slot: u64, local_cache: LocalCache, config: RelayConfig) -> Self { Self { curr_bid_slot, config, - chain_info, local_cache, best_merged_block: None, best_mergeable_orders: BestMergeableOrders::new(), @@ -293,7 +285,11 @@ impl BlockMerger { debug!(?response.builder_inclusions, %response.proposer_value, "preparing merged payload for storage"); let start_time = Instant::now(); let bid_slot = self.curr_bid_slot; - let max_blobs_per_block = self.chain_info.max_blobs_per_block(); + let max_blobs_per_block = self + .local_cache + .get_chain_info() + .expect("chain info should be cached") + .max_blobs_per_block(); let base_block_data = match self.appendable_blocks.get(&response.base_block_hash) { Some(data) => data, @@ -678,15 +674,13 @@ fn get_tx_versioned_hashes(mut raw_tx: &[u8]) -> Vec { Err(err) => { warn!(?err, "failed to decode transaction for versioned hash extraction"); return vec![]; - }, + } }; match tx { - TxEnvelope::Eip4844(tx_eip4844) => { - match tx_eip4844.blob_versioned_hashes() { - Some(vhs) => vhs.to_vec(), - None => vec![], - } - } + TxEnvelope::Eip4844(tx_eip4844) => match tx_eip4844.blob_versioned_hashes() { + Some(vhs) => vhs.to_vec(), + None => vec![], + }, _ => vec![], } } diff --git a/crates/relay/src/auctioneer/context.rs b/crates/relay/src/auctioneer/context.rs index 894cabd6..827a3c3b 100644 --- a/crates/relay/src/auctioneer/context.rs +++ b/crates/relay/src/auctioneer/context.rs @@ -9,8 +9,7 @@ use std::{ use alloy_primitives::{B256, U256}; use helix_common::{ - BuilderInfo, RelayConfig, chain_info::ChainInfo, local_cache::LocalCache, - metrics::SimulatorMetrics, spawn_tracked, utils::alert_discord, + BuilderInfo, RelayConfig, local_cache::LocalCache, metrics::SimulatorMetrics, spawn_tracked, }; use helix_types::{BlsPublicKeyBytes, HydrationCache, Slot, SubmissionVersion}; use rustc_hash::FxHashMap; @@ -26,7 +25,7 @@ use crate::{ simulator::manager::{SimulationResult, SimulatorManager}, types::{PayloadEntry, PendingPayload}, }, - database::postgres::postgres_db_service::PostgresDatabaseService, + database::handle::DbHandle, }; // Context that is only valid for a given slot @@ -44,15 +43,12 @@ pub struct SlotContext { } pub struct Context { - pub chain_info: ChainInfo, pub config: RelayConfig, pub cache: LocalCache, pub unknown_builder_info: BuilderInfo, - pub db: Arc, + pub db: DbHandle, pub slot_context: SlotContext, pub bid_adjustor: B, - pub adjustments_enabled: Arc, - pub adjustments_failsafe_trigger: Arc, } const EXPECTED_PAYLOADS_PER_SLOT: usize = 5000; @@ -65,10 +61,9 @@ impl Context { // TODO: refactor to accept fewer parameters #[allow(clippy::too_many_arguments)] pub fn new( - chain_info: ChainInfo, config: RelayConfig, sim_manager: SimulatorManager, - db: Arc, + db: DbHandle, bid_sorter: BidSorter, cache: LocalCache, bid_adjustor: B, @@ -83,7 +78,7 @@ impl Context { api_key: None, }; - let block_merger = BlockMerger::new(0, chain_info.clone(), cache.clone(), config.clone()); + let block_merger = BlockMerger::new(0, cache.clone(), config.clone()); let slot_context = SlotContext { sim_manager, @@ -102,28 +97,9 @@ impl Context { block_merger, }; - let adjustments_enabled = Arc::new(AtomicBool::new(false)); - let adjustments_failsafe_trigger = Arc::new(AtomicBool::new(false)); + Self::spawn_adjustments_dry_run_task(auctioneer, cache.adjustments_enabled.clone()); - Self::spawn_check_flag_task( - db.clone(), - adjustments_enabled.clone(), - adjustments_failsafe_trigger.clone(), - ); - - Self::spawn_adjustments_dry_run_task(auctioneer, adjustments_enabled.clone()); - - Self { - chain_info, - cache, - unknown_builder_info, - slot_context, - db, - config, - bid_adjustor, - adjustments_enabled, - adjustments_failsafe_trigger, - } + Self { cache, unknown_builder_info, slot_context, db, config, bid_adjustor } } pub fn builder_info(&self, builder: &BlsPublicKeyBytes) -> BuilderInfo { @@ -157,26 +133,16 @@ impl Context { result.submission.execution_payload_ref().to_header(None, None) ); - if !self.adjustments_enabled.load(Ordering::Relaxed) { + if !self.cache.adjustments_enabled.load(Ordering::Relaxed) { warn!(%block_hash, "adjustments already disabled"); } else { SimulatorMetrics::disable_adjustments(); - self.adjustments_enabled.store(false, Ordering::Relaxed); - - let db = self.db.clone(); - let failsafe_trigger = self.adjustments_failsafe_trigger.clone(); - let adjustments_enabled = self.adjustments_enabled.clone(); - spawn_tracked!(async move { - if let Err(err) = db.disable_adjustments().await { - failsafe_trigger.store(true, Ordering::Relaxed); - adjustments_enabled.store(false, Ordering::Relaxed); - error!(%block_hash, %err, "failed to disable adjustments in database, pulling the failsafe trigger"); - alert_discord(&format!( - "{} {} failed to disable adjustments in database, pulling the failsafe trigger", - err, block_hash - )); - } - }); + self.cache.adjustments_enabled.store(false, Ordering::Relaxed); + self.db.disable_adjustments( + block_hash, + self.cache.adjustments_failsafe_trigger.clone(), + self.cache.adjustments_enabled.clone(), + ); } } else if self.cache.demote_builder(&builder) { warn!(%builder, %block_hash, %err, "Block simulation resulted in an error. Demoting builder..."); @@ -187,20 +153,13 @@ impl Context { let bid_slot = result.submission.slot(); let failsafe_triggered = self.sim_manager.failsafe_triggered.clone(); - let db = self.db.clone(); - spawn_tracked!(async move { - if let Err(err) = db - .db_demote_builder(bid_slot.as_u64(), &builder, &block_hash, reason) - .await - { - failsafe_triggered.store(true, Ordering::Relaxed); - error!(%builder, %err, %block_hash, "failed to demote builder in database! Pausing all optmistic submissions"); - alert_discord(&format!( - "{} {} {} failed to demote builder in database! Pausing all optmistic submissions", - builder, err, block_hash - )); - } - }); + self.db.db_demote_builder( + bid_slot.as_u64(), + builder, + block_hash, + reason, + failsafe_triggered, + ); } else { warn!(%err, %builder, %block_hash, "builder already demoted, skipping demotion"); } @@ -274,38 +233,6 @@ impl Context { self.payloads.insert(block_hash, payload); } - fn spawn_check_flag_task( - db: Arc, - flag: Arc, - failsafe_triggered: Arc, - ) { - spawn_tracked!(async move { - let mut interval = tokio::time::interval(DB_CHECK_INTERVAL); - loop { - interval.tick().await; - - if failsafe_triggered.load(Ordering::Relaxed) { - flag.store(false, Ordering::Relaxed); - return; - } - - match db.check_adjustments_enabled().await { - Ok(value) => { - let previous = flag.swap(value, Ordering::Relaxed); - if previous != value { - tracing::info!( - "adjustments enabled flag changed from {} to {}", - previous, - value - ); - } - } - Err(e) => tracing::error!("failed to check adjustments_enabled flag: {}", e), - } - } - }); - } - fn spawn_adjustments_dry_run_task( auctioneer: crossbeam_channel::Sender, adjustments_enabled: Arc, diff --git a/crates/relay/src/auctioneer/get_header.rs b/crates/relay/src/auctioneer/get_header.rs index 56c16d71..0505fb82 100644 --- a/crates/relay/src/auctioneer/get_header.rs +++ b/crates/relay/src/auctioneer/get_header.rs @@ -40,7 +40,7 @@ impl Context { return Ok(merged_bid); }; - if self.adjustments_enabled.load(Ordering::Relaxed) && + if self.cache.adjustments_enabled.load(Ordering::Relaxed) && let Some((adjusted_bid, sim_request)) = self.bid_adjustor.try_apply_adjustments(original_bid, slot_data, false) { diff --git a/crates/relay/src/auctioneer/mod.rs b/crates/relay/src/auctioneer/mod.rs index 10ae19b9..e284b2bf 100644 --- a/crates/relay/src/auctioneer/mod.rs +++ b/crates/relay/src/auctioneer/mod.rs @@ -14,7 +14,6 @@ mod worker; use std::{ cmp::Ordering, - sync::Arc, time::{Duration, Instant}, }; @@ -26,7 +25,6 @@ use helix_common::{ api::builder_api::{ BuilderGetValidatorsResponseEntry, InclusionListWithMetadata, TopBidUpdate, }, - chain_info::ChainInfo, local_cache::LocalCache, metrics::{STATE_TRANSITION_COUNT, STATE_TRANSITION_LATENCY, WORKER_QUEUE_LEN, WORKER_UTIL}, record_submission_step, @@ -46,19 +44,17 @@ pub use crate::auctioneer::{ simulator::{SimulatorRequest, client::SimulatorClient}, }; use crate::{ - PostgresDatabaseService, api::{builder::error::BuilderApiError, proposer::ProposerApiError}, auctioneer::{ bid_sorter::BidSorter, context::Context, manager::SimulatorManager, types::PendingPayload, }, + database::handle::DbHandle, housekeeper::PayloadAttributesUpdate, }; -// TODO: tidy up builder and proposer api state, and spawn in a separate function pub fn spawn_workers( - chain_info: ChainInfo, config: RelayConfig, - db: Arc, + db: DbHandle, cache: LocalCache, bid_adjustor: B, top_bid_tx: tokio::sync::broadcast::Sender, @@ -70,7 +66,7 @@ pub fn spawn_workers( if config.is_registration_instance { for core in config.cores.reg_workers.clone() { - let worker = RegWorker::new(core, chain_info.clone()); + let worker = RegWorker::new(core, cache.clone()); let rx = reg_worker_rx.clone(); std::thread::Builder::new() @@ -85,13 +81,7 @@ pub fn spawn_workers( if config.is_submission_instance { for core in config.cores.sub_workers.clone() { - let worker = SubWorker::new( - core, - event_tx.clone(), - cache.clone(), - chain_info.clone(), - config.clone(), - ); + let worker = SubWorker::new(core, event_tx.clone(), cache.clone(), config.clone()); let rx = sub_worker_rx.clone(); std::thread::Builder::new() @@ -108,7 +98,6 @@ pub fn spawn_workers( let bid_sorter = BidSorter::new(top_bid_tx); let sim_manager = SimulatorManager::new(config.simulators.clone(), event_tx.clone()); let ctx = Context::new( - chain_info, config, sim_manager, db, @@ -598,7 +587,8 @@ impl State { match (registration_data, payload_attributes_map.is_empty()) { (Some(registration_data), false) => { - let current_fork = ctx.chain_info.fork_at_slot(bid_slot); + let chain_info = ctx.cache.get_chain_info().expect("chain info should be cached"); + let current_fork = chain_info.fork_at_slot(bid_slot); info!(%bid_slot, attributes = payload_attributes_map.len(), "processed slot data, start sorting"); diff --git a/crates/relay/src/auctioneer/simulator/manager.rs b/crates/relay/src/auctioneer/simulator/manager.rs index 9dff9a11..30e2f109 100644 --- a/crates/relay/src/auctioneer/simulator/manager.rs +++ b/crates/relay/src/auctioneer/simulator/manager.rs @@ -15,14 +15,13 @@ use helix_types::{BlsPublicKeyBytes, SignedBidSubmission, SubmissionVersion}; use tokio::sync::oneshot; use tracing::{debug, error, info, warn}; -use crate::{ - api::service::SIMULATOR_REQUEST_TIMEOUT, - auctioneer::{ - simulator::{BlockMergeRequest, SimulatorRequest, client::SimulatorClient}, - types::{Event, SubmissionResult}, - }, +use crate::auctioneer::{ + simulator::{BlockMergeRequest, SimulatorRequest, client::SimulatorClient}, + types::{Event, SubmissionResult}, }; +pub(crate) const SIMULATOR_REQUEST_TIMEOUT: Duration = Duration::from_secs(20); + #[derive(Default)] struct LocalTelemetry { sims_reqs: usize, diff --git a/crates/relay/src/auctioneer/submit_block.rs b/crates/relay/src/auctioneer/submit_block.rs index 6c2afc5b..d3719b87 100644 --- a/crates/relay/src/auctioneer/submit_block.rs +++ b/crates/relay/src/auctioneer/submit_block.rs @@ -3,14 +3,14 @@ use std::time::Instant; use alloy_primitives::{Address, B256, U256}; use helix_common::{ self, BuilderInfo, SubmissionTrace, bid_submission::OptimisticVersion, - metrics::HYDRATION_CACHE_HITS, record_submission_step, spawn_tracked, + metrics::HYDRATION_CACHE_HITS, record_submission_step, }; use helix_types::{ BidAdjustmentData, BlockValidationError, MergeableOrdersWithPref, SignedBidSubmission, SubmissionVersion, }; use tokio::sync::oneshot; -use tracing::{error, trace, warn}; +use tracing::{trace, warn}; use crate::{ api::builder::error::BuilderApiError, @@ -117,7 +117,8 @@ impl Context { Submission::Dehydrated(dehydrated) => { trace!("hydrating submission"); let start = Instant::now(); - let max_blobs_per_block = self.chain_info.max_blobs_per_block(); + let chain_info = self.cache.get_chain_info().expect("chain info should be cached"); + let max_blobs_per_block = chain_info.max_blobs_per_block(); let hydrated = dehydrated.hydrate(&mut self.hydration_cache, max_blobs_per_block)?; @@ -248,14 +249,7 @@ impl Context { let trace_clone = req.trace.clone(); let opt_version = req.optimistic_version(); - let db = self.db.clone(); - spawn_tracked!(async move { - if let Err(err) = - db.store_block_submission(sub_clone, trace_clone, opt_version, is_adjusted).await - { - error!(%err, "failed to store block submission") - } - }); + self.db.store_block_submission(sub_clone, trace_clone, opt_version, is_adjusted); self.sim_manager.handle_sim_request(req, fast_track); } diff --git a/crates/relay/src/auctioneer/worker.rs b/crates/relay/src/auctioneer/worker.rs index 91719464..6347fc0d 100644 --- a/crates/relay/src/auctioneer/worker.rs +++ b/crates/relay/src/auctioneer/worker.rs @@ -91,7 +91,6 @@ pub(super) struct SubWorker { id: String, tx: crossbeam_channel::Sender, cache: LocalCache, - chain_info: ChainInfo, tel: Telemetry, config: RelayConfig, } @@ -101,17 +100,9 @@ impl SubWorker { id: usize, tx: crossbeam_channel::Sender, cache: LocalCache, - chain_info: ChainInfo, config: RelayConfig, ) -> Self { - Self { - id: format!("submission_{id}"), - tx, - cache, - chain_info, - tel: Telemetry::default(), - config, - } + Self { id: format!("submission_{id}"), tx, cache, tel: Telemetry::default(), config } } pub(super) fn run(mut self, rx: crossbeam_channel::Receiver) { @@ -293,23 +284,25 @@ impl SubWorker { block_merging_dry_run: self.config.block_merging_config.is_dry_run, }; + let chain_info = self.cache.get_chain_info().expect("chain info should be cached"); + let (submission, merging_data, bid_adjustment_data) = match submission_type { Some(SubmissionType::Default) => { - decode_default(&mut decoder, body, trace, &self.chain_info, &flags)? + decode_default(&mut decoder, body, trace, &chain_info, &flags)? } Some(SubmissionType::Merge) => { - decode_merge(&mut decoder, body, trace, &self.chain_info, &flags)? + decode_merge(&mut decoder, body, trace, &chain_info, &flags)? } Some(SubmissionType::Dehydrated) => { - decode_dehydrated(&mut decoder, body, trace, &self.chain_info, &flags)? + decode_dehydrated(&mut decoder, body, trace, &chain_info, &flags)? } None => { if should_hydrate { - decode_dehydrated(&mut decoder, body, trace, &self.chain_info, &flags)? + decode_dehydrated(&mut decoder, body, trace, &chain_info, &flags)? } else if has_mergeable_data { - decode_merge(&mut decoder, body, trace, &self.chain_info, &flags)? + decode_merge(&mut decoder, body, trace, &chain_info, &flags)? } else { - decode_default(&mut decoder, body, trace, &self.chain_info, &flags)? + decode_default(&mut decoder, body, trace, &chain_info, &flags)? } } }; @@ -329,7 +322,8 @@ impl SubWorker { _trace: &mut GetPayloadTrace, ) -> Result<(SignedBlindedBeaconBlock, B256), ProposerApiError> { trace!("verifying signature"); - verify_signed_blinded_block_signature(&self.chain_info, &blinded_block, proposer_pubkey)?; + let chain_info = self.cache.get_chain_info().expect("chain info should be cached"); + verify_signed_blinded_block_signature(&chain_info, &blinded_block, proposer_pubkey)?; trace!("signature verified"); let block_hash = blinded_block @@ -347,13 +341,13 @@ impl SubWorker { /// Worker to process registrations verifications pub(super) struct RegWorker { id: String, - chain_info: ChainInfo, + local_cache: LocalCache, tel: Telemetry, } impl RegWorker { - pub(super) fn new(id: usize, chain_info: ChainInfo) -> Self { - Self { id: format!("registration_{id}"), chain_info, tel: Default::default() } + pub(super) fn new(id: usize, local_cache: LocalCache) -> Self { + Self { id: format!("registration_{id}"), local_cache, tel: Default::default() } } pub(super) fn run(mut self, rx: crossbeam_channel::Receiver) { @@ -396,7 +390,9 @@ impl RegWorker { } let start = Instant::now(); - let valid = validate_registration(&self.chain_info, ®s[i]); + let chain_info = + self.local_cache.get_chain_info().expect("chain info should be cached"); + let valid = validate_registration(&chain_info, ®s[i]); res.push((i, valid.is_ok())); WORKER_TASK_COUNT.with_label_values(&["Registration", &self.id]).inc(); diff --git a/crates/relay/src/database/handle.rs b/crates/relay/src/database/handle.rs new file mode 100644 index 00000000..94b3f099 --- /dev/null +++ b/crates/relay/src/database/handle.rs @@ -0,0 +1,214 @@ +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; + +use alloy_primitives::{B256, U256}; +use helix_common::{ + DataAdjustmentsEntry, GetHeaderTrace, GetPayloadTrace, GossipedPayloadTrace, SubmissionTrace, + ValidatorSummary, + api::{ + builder_api::{BuilderGetValidatorsResponseEntry, InclusionListWithMetadata}, + proposer_api::GetHeaderParams, + }, + bid_submission::OptimisticVersion, + utils::alert_discord, +}; +use helix_types::{BlsPublicKeyBytes, MergedBlock, SignedBidSubmission}; +use tracing::error; + +use crate::database::{ + BuilderInfoDocument, SavePayloadParams, + postgres::postgres_db_service::{DbRequest, PendingBlockSubmissionValue}, +}; + +#[derive(Clone)] +pub struct DbHandle { + sender: crossbeam_channel::Sender, + batch_sender: crossbeam_channel::Sender, +} + +impl DbHandle { + pub fn new( + sender: crossbeam_channel::Sender, + batch_sender: crossbeam_channel::Sender, + ) -> Self { + Self { sender, batch_sender } + } + + pub fn save_too_late_get_payload( + &self, + slot: u64, + proposer_pub_key: BlsPublicKeyBytes, + payload_hash: B256, + message_received: u64, + payload_fetched: u64, + ) { + if let Err(err) = self.sender.try_send(DbRequest::SaveTooLateGetPayload { + slot, + proposer_pub_key, + payload_hash, + message_received, + payload_fetched, + }) { + error!(%err, "failed to send SaveTooLateGetPayload request"); + } + } + + pub fn save_delivered_payload(&self, params: SavePayloadParams) { + if let Err(err) = self.sender.try_send(DbRequest::SaveDeliveredPayload { params }) { + error!(%err, "failed to send SaveDeliveredPayload request"); + } + } + + pub fn store_builders_info(&self, builders: Vec) { + if let Err(err) = self.sender.send(DbRequest::StoreBuildersInfo { builders }) { + error!(%err, "failed to send StoreBuildersInfo request"); + } + } + + pub fn db_demote_builder( + &self, + slot: u64, + builder_pub_key: BlsPublicKeyBytes, + block_hash: B256, + reason: String, + failsafe_triggered: Arc, + ) { + if let Err(err) = self.sender.send(DbRequest::DbDemoteBuilder { + slot, + builder_pub_key, + block_hash, + reason, + failsafe_triggered: failsafe_triggered.clone(), + }) { + error!(%err, "failed to send DbDemoteBuilder request triggering failsafe: stopping all optimistic submissions"); + failsafe_triggered.store(true, Ordering::Relaxed); + alert_discord(&format!( + "{} {} {} failed to demote builder in database! Pausing all optmistic submissions", + builder_pub_key, err, block_hash + )); + } + } + + pub fn store_block_submission( + &self, + submission: SignedBidSubmission, + trace: SubmissionTrace, + optimistic_version: OptimisticVersion, + is_adjusted: bool, + ) { + if let Err(err) = self.batch_sender.send(PendingBlockSubmissionValue { + submission, + trace, + optimistic_version, + is_adjusted, + }) { + error!(%err, "failed to store block submission"); + } + } + + pub fn save_get_header_call( + &self, + params: GetHeaderParams, + best_block_hash: B256, + value: U256, + trace: GetHeaderTrace, + mev_boost: bool, + user_agent: Option, + ) { + if let Err(err) = self.sender.send(DbRequest::SaveGetHeaderCall { + params, + best_block_hash, + value, + trace, + mev_boost, + user_agent, + }) { + error!(%err, "failed to send SaveGetHeaderCall request"); + } + } + + pub fn save_failed_get_payload( + &self, + slot: u64, + block_hash: B256, + error: String, + trace: GetPayloadTrace, + ) { + if let Err(err) = + self.sender.send(DbRequest::SaveFailedGetPayload { slot, block_hash, error, trace }) + { + error!(%err, "failed to send SaveFailedGetPayload request"); + } + } + + pub fn save_gossiped_payload_trace(&self, block_hash: B256, trace: GossipedPayloadTrace) { + if let Err(err) = + self.sender.send(DbRequest::SaveGossipedPayloadTrace { block_hash, trace }) + { + error!(%err, "failed to send SaveGossipedPayloadTrace request"); + } + } + + pub fn save_inclusion_list( + &self, + inclusion_list: InclusionListWithMetadata, + slot: u64, + block_parent_hash: B256, + proposer_pubkey: BlsPublicKeyBytes, + ) { + if let Err(err) = self.sender.send(DbRequest::SaveInclusionList { + inclusion_list, + slot, + block_parent_hash, + proposer_pubkey, + }) { + error!(%err, "failed to send SaveInclusionList request"); + } + } + + pub fn save_block_adjustments_data(&self, entry: DataAdjustmentsEntry) { + if let Err(err) = self.sender.send(DbRequest::SaveBlockAdjustmentsData { entry }) { + error!(%err, "failed to send SaveBlockAdjustmentsData request"); + } + } + + pub fn set_known_validators(&self, known_validators: Vec) { + if let Err(err) = self.sender.try_send(DbRequest::SetKnownValidators { known_validators }) { + error!(%err, "failed to send SetKnownValidators request"); + } + } + + pub fn set_proposer_duties(&self, duties: Vec) { + if let Err(err) = self.sender.try_send(DbRequest::SetProposerDuties { duties }) { + error!(%err, "failed to send SetProposerDuties request"); + } + } + + pub fn disable_adjustments( + &self, + block_hash: B256, + failsafe_trigger: Arc, + adjustments_enabled: Arc, + ) { + if let Err(err) = self.sender.try_send(DbRequest::DisableAdjustments { + block_hash, + failsafe_trigger: failsafe_trigger.clone(), + adjustments_enabled: adjustments_enabled.clone(), + }) { + error!(%err, "failed to send DisableAdjustments request triggering failsafe: stopping all adjustments"); + failsafe_trigger.store(true, Ordering::Relaxed); + alert_discord(&format!( + "{} {} failed to disable adjustments in database! Pausing all adjustments", + block_hash, err + )); + } + } + + pub fn save_merged_blocks(&self, blocks: Vec) { + if let Err(err) = self.sender.try_send(DbRequest::SaveMergedBlocks { blocks }) { + error!(%err, "failed to send SaveMergedBlocks request"); + } + } +} diff --git a/crates/relay/src/database/mod.rs b/crates/relay/src/database/mod.rs index dbbbd73b..91915a8a 100644 --- a/crates/relay/src/database/mod.rs +++ b/crates/relay/src/database/mod.rs @@ -1,4 +1,5 @@ pub mod error; +pub mod handle; pub mod postgres; pub mod types; @@ -10,15 +11,21 @@ use std::{ time::Duration, }; -use helix_common::{RelayConfig, is_local_dev}; +use helix_common::{RelayConfig, is_local_dev, local_cache}; use postgres::postgres_db_service::PostgresDatabaseService; pub use types::*; +pub use crate::database::postgres::postgres_db_service::{DbRequest, PendingBlockSubmissionValue}; + pub async fn start_db_service( config: &RelayConfig, known_validators_loaded: Arc, + db_request_receiver: crossbeam_channel::Receiver, + db_batch_request_receiver: crossbeam_channel::Receiver, + local_cache: Arc, ) -> eyre::Result> { - let mut postgres_db = PostgresDatabaseService::from_relay_config(config).await; + let mut postgres_db = + PostgresDatabaseService::from_relay_config(config, local_cache.clone()).await; if !is_local_dev() { postgres_db.init_forever().await; @@ -34,8 +41,13 @@ pub async fn start_db_service( tokio::spawn({ let known_validators_loaded = known_validators_loaded.clone(); let postgres_db = postgres_db.clone(); + let local_cache = local_cache.clone(); async move { postgres_db.load_known_validators().await; + postgres_db.load_validator_registrations().await; + postgres_db.load_builder_infos(local_cache.clone()).await; + postgres_db.load_trusted_proposers(local_cache.clone()).await; + postgres_db.load_validator_pools().await; known_validators_loaded.store(true, Ordering::Relaxed); if should_refresh_cache { @@ -46,14 +58,17 @@ pub async fn start_db_service( loop { tick.tick().await; postgres_db.load_known_validators().await; + postgres_db.load_validator_registrations().await; + postgres_db.load_builder_infos(local_cache.clone()).await; + postgres_db.load_trusted_proposers(local_cache.clone()).await; + postgres_db.load_validator_pools().await; } } } }); } - //postgres_db.load_validator_registrations().await; - postgres_db.start_processors().await; + postgres_db.start_processors(db_request_receiver, db_batch_request_receiver).await; Ok(Arc::new(postgres_db)) } diff --git a/crates/relay/src/database/postgres/postgres_db_service.rs b/crates/relay/src/database/postgres/postgres_db_service.rs index b7cc4fa2..3d89d1bc 100644 --- a/crates/relay/src/database/postgres/postgres_db_service.rs +++ b/crates/relay/src/database/postgres/postgres_db_service.rs @@ -2,36 +2,32 @@ use std::{ ops::DerefMut, sync::{ Arc, - atomic::{AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, }, time::{Duration, SystemTime}, }; use alloy_primitives::{B256, U256}; -use dashmap::{DashMap, DashSet}; use deadpool_postgres::{Config, GenericClient, ManagerConfig, Pool, RecyclingMethod}; use helix_common::{ - BuilderInfo, DataAdjustmentsEntry, Filtering, GetHeaderTrace, GetPayloadTrace, - GossipedPayloadTrace, PostgresConfig, ProposerInfo, RelayConfig, - SignedValidatorRegistrationEntry, SubmissionTrace, ValidatorPreferences, ValidatorSummary, + DataAdjustmentsEntry, Filtering, GetHeaderTrace, GetPayloadTrace, GossipedPayloadTrace, + PostgresConfig, ProposerInfo, RelayConfig, SignedValidatorRegistrationEntry, SubmissionTrace, + ValidatorPreferences, ValidatorSummary, api::{ builder_api::{BuilderGetValidatorsResponseEntry, InclusionListWithMetadata}, data_api::{ BidFilters, DataAdjustmentsResponse, MergedBlockResponse, ProposerHeaderDeliveredParams, ProposerHeaderDeliveredResponse, }, - proposer_api::{GetHeaderParams, ValidatorRegistrationInfo}, + proposer_api::GetHeaderParams, }, bid_submission::OptimisticVersion, + local_cache::LocalCache, metrics::DbMetricRecord, - utils::utcnow_ms, + utils::{alert_discord, utcnow_ms}, }; -use helix_types::{ - BlsPublicKeyBytes, MergedBlock, SignedBidSubmission, SignedValidatorRegistration, Slot, -}; -use parking_lot::RwLock; +use helix_types::{BlsPublicKeyBytes, MergedBlock, SignedBidSubmission, Slot}; use rustc_hash::FxHashSet; -use tokio::sync::mpsc::Sender; use tokio_postgres::{NoTls, types::ToSql}; use tracing::{error, info, instrument, warn}; @@ -48,7 +44,71 @@ use crate::database::{ }, }; -struct PendingBlockSubmissionValue { +pub enum DbRequest { + SaveTooLateGetPayload { + slot: u64, + proposer_pub_key: BlsPublicKeyBytes, + payload_hash: B256, + message_received: u64, + payload_fetched: u64, + }, + SaveDeliveredPayload { + params: SavePayloadParams, + }, + StoreBuildersInfo { + builders: Vec, + }, + DbDemoteBuilder { + slot: u64, + builder_pub_key: BlsPublicKeyBytes, + block_hash: B256, + reason: String, + failsafe_triggered: Arc, + }, + SaveGetHeaderCall { + params: GetHeaderParams, + best_block_hash: B256, + value: U256, + trace: GetHeaderTrace, + mev_boost: bool, + user_agent: Option, + }, + SaveFailedGetPayload { + slot: u64, + block_hash: B256, + error: String, + trace: GetPayloadTrace, + }, + SaveGossipedPayloadTrace { + block_hash: B256, + trace: GossipedPayloadTrace, + }, + SaveInclusionList { + inclusion_list: InclusionListWithMetadata, + slot: u64, + block_parent_hash: B256, + proposer_pubkey: BlsPublicKeyBytes, + }, + SaveBlockAdjustmentsData { + entry: DataAdjustmentsEntry, + }, + SetKnownValidators { + known_validators: Vec, + }, + SetProposerDuties { + duties: Vec, + }, + DisableAdjustments { + block_hash: B256, + failsafe_trigger: Arc, + adjustments_enabled: Arc, + }, + SaveMergedBlocks { + blocks: Vec, + }, +} + +pub struct PendingBlockSubmissionValue { pub submission: SignedBidSubmission, pub trace: SubmissionTrace, pub optimistic_version: OptimisticVersion, @@ -57,6 +117,7 @@ struct PendingBlockSubmissionValue { const BLOCK_SUBMISSION_FIELD_COUNT: usize = 17; const MAINNET_VALIDATOR_COUNT: usize = 1_100_000; +const DB_CHECK_INTERVAL: Duration = Duration::from_secs(1); static DELIVERED_PAYLOADS_MIG_SLOT: AtomicU64 = AtomicU64::new(0); fn new_validator_set() -> FxHashSet { @@ -88,18 +149,17 @@ struct TrustedProposerParams { #[derive(Clone)] pub struct PostgresDatabaseService { - validator_registration_cache: Arc>, - pending_validator_registrations: Arc>, - block_submissions_sender: Option>, - known_validators_cache: Arc>>, - validator_pool_cache: Arc>, region: i16, pub pool: Arc, pub high_priority_pool: Arc, + pub local_cache: Arc, } impl PostgresDatabaseService { - pub async fn from_relay_config(relay_config: &RelayConfig) -> Self { + pub async fn from_relay_config( + relay_config: &RelayConfig, + local_cache: Arc, + ) -> Self { let mut cfg = Config::new(); cfg.host = Some(relay_config.postgres.hostname.clone()); cfg.port = Some(relay_config.postgres.port); @@ -129,14 +189,10 @@ impl PostgresDatabaseService { }; PostgresDatabaseService { - validator_registration_cache: Arc::new(DashMap::new()), - pending_validator_registrations: Arc::new(DashSet::new()), - block_submissions_sender: None, - known_validators_cache: Arc::new(RwLock::new(new_validator_set())), - validator_pool_cache: Arc::new(DashMap::new()), region: relay_config.postgres.region, pool: Arc::new(pool), high_priority_pool: Arc::new(high_priority_pool), + local_cache, } } @@ -194,16 +250,36 @@ impl PostgresDatabaseService { pub async fn load_known_validators(&self) { let mut record = DbMetricRecord::new("load_known_validators"); - let client = self.pool.get().await.unwrap(); - let rows = client.query("SELECT * FROM known_validators", &[]).await.unwrap(); + let client = match self.pool.get().await { + Ok(client) => client, + Err(e) => { + error!("Error getting client from pool: {}", e); + return; + } + }; + + let rows = match client.query("SELECT * FROM known_validators", &[]).await { + Ok(rows) => rows, + Err(e) => { + error!("Error querying known_validators: {}", e); + return; + } + }; + let mut set = new_validator_set(); for row in rows { let public_key: BlsPublicKeyBytes = - parse_bytes_to_pubkey_bytes(row.get::<&str, &[u8]>("public_key")).unwrap(); + match parse_bytes_to_pubkey_bytes(row.get::<&str, &[u8]>("public_key")) { + Ok(pk) => pk, + Err(e) => { + error!("Error parsing public key: {}", e); + continue; + } + }; set.insert(public_key); } - *self.known_validators_cache.write() = set; + *self.local_cache.known_validators_cache.write() = set; record.record_success(); } @@ -216,7 +292,8 @@ impl PostgresDatabaseService { Ok(entries) => { let num_entries = entries.len(); entries.into_iter().for_each(|entry| { - self.validator_registration_cache + self.local_cache + .validator_registration_cache .insert(entry.registration_info.registration.message.pubkey, entry); }); info!("Loaded {} validator registrations", num_entries); @@ -228,9 +305,71 @@ impl PostgresDatabaseService { } } - pub async fn start_processors(&mut self) { + pub async fn start_processors( + &mut self, + db_request_receiver: crossbeam_channel::Receiver, + block_submission_receiver: crossbeam_channel::Receiver, + ) { self.start_registration_processor().await; - self.start_block_submission_processor().await; + self.start_block_submission_processor(block_submission_receiver).await; + self.start_db_request_processor(db_request_receiver).await; + self.start_check_flag_task(); + } + + fn start_check_flag_task(&mut self) { + let svc_clone = self.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(DB_CHECK_INTERVAL); + loop { + interval.tick().await; + + if svc_clone.local_cache.adjustments_failsafe_trigger.load(Ordering::Relaxed) { + svc_clone.local_cache.adjustments_enabled.store(false, Ordering::Relaxed); + return; + } + + match svc_clone.check_adjustments_enabled().await { + Ok(value) => { + let previous = svc_clone + .local_cache + .adjustments_enabled + .swap(value, Ordering::Relaxed); + if previous != value { + tracing::info!( + "adjustments enabled flag changed from {} to {}", + previous, + value + ); + } + } + Err(e) => tracing::error!("failed to check adjustments_enabled flag: {}", e), + } + } + }); + } + + pub async fn start_db_request_processor( + &mut self, + receiver: crossbeam_channel::Receiver, + ) { + let svc_clone = self.clone(); + + tokio::spawn(async move { + loop { + match receiver.recv() { + Ok(request) => { + let svc = svc_clone.clone(); + tokio::spawn(async move { + svc.handle_db_request(request).await; + }); + } + Err(e) => { + error!("DB request receiver error: {}", e); + break; + } + } + } + }); } pub async fn start_registration_processor(&self) { @@ -239,12 +378,13 @@ impl PostgresDatabaseService { tokio::spawn(async move { loop { interval.tick().await; - match self_clone.pending_validator_registrations.len() { + match self_clone.local_cache.pending_validator_registrations.len() { 0 => continue, _ => { let mut entries = Vec::new(); - for key in self_clone.pending_validator_registrations.iter() { - if let Some(entry) = self_clone.validator_registration_cache.get(&*key) + for key in self_clone.local_cache.pending_validator_registrations.iter() { + if let Some(entry) = + self_clone.local_cache.validator_registration_cache.get(&*key) { entries.push(entry.clone()); } @@ -252,7 +392,7 @@ impl PostgresDatabaseService { match self_clone._save_validator_registrations(&entries).await { Ok(_) => { for entry in entries.iter() { - self_clone.pending_validator_registrations.remove( + self_clone.local_cache.pending_validator_registrations.remove( &entry.registration_info.registration.message.pubkey, ); } @@ -268,10 +408,10 @@ impl PostgresDatabaseService { }); } - pub async fn start_block_submission_processor(&mut self) { - let (block_submissions_sender, mut block_submissions_receiver) = - tokio::sync::mpsc::channel(10_000); - self.block_submissions_sender = Some(block_submissions_sender); + pub async fn start_block_submission_processor( + &mut self, + batch_receiver: crossbeam_channel::Receiver, + ) { let svc_clone = self.clone(); tokio::spawn(async move { let mut batch = Vec::with_capacity(2_000); @@ -279,10 +419,12 @@ impl PostgresDatabaseService { let mut last_slot_processed = 0; loop { tokio::select! { - Some(item) = block_submissions_receiver.recv() => { - batch.push(item); - } _ = ticker.tick() => { + // Drain all available messages from the crossbeam channel + while let Ok(item) = batch_receiver.try_recv() { + batch.push(item); + } + if !batch.is_empty() { let mut retry_count = 0; @@ -311,6 +453,172 @@ impl PostgresDatabaseService { }); } + pub async fn load_builder_infos(&self, local_cache: Arc) { + let mut record = DbMetricRecord::new("load_builder_infos"); + + match self.get_all_builder_infos().await { + Ok(builder_infos) => { + local_cache.update_builder_infos(&builder_infos, true); + info!("Loaded {} builder infos", builder_infos.len()); + record.record_success(); + } + Err(e) => { + error!("Error loading builder infos: {}", e); + } + } + } + + pub async fn load_trusted_proposers(&self, local_cache: Arc) { + let mut record = DbMetricRecord::new("load_trusted_proposers"); + + match self.get_trusted_proposers().await { + Ok(proposers) => { + let num_proposers = proposers.len(); + local_cache.update_trusted_proposers(proposers); + info!("Loaded {} trusted proposers", num_proposers); + record.record_success(); + } + Err(e) => { + error!("Error loading trusted proposers: {}", e); + } + } + } + + async fn handle_db_request(&self, request: DbRequest) { + match request { + DbRequest::SaveTooLateGetPayload { + slot, + proposer_pub_key, + payload_hash, + message_received, + payload_fetched, + } => { + if let Err(err) = self + .save_too_late_get_payload( + slot, + &proposer_pub_key, + &payload_hash, + message_received, + payload_fetched, + ) + .await + { + error!(%err, "failed to save too late get payload"); + } + } + DbRequest::SaveDeliveredPayload { params } => { + if let Err(err) = self.save_delivered_payload(¶ms).await { + error!(%err, "error saving payload to database"); + } + } + DbRequest::StoreBuildersInfo { builders } => { + if let Err(err) = self.store_builders_info(&builders).await { + error!(%err, "failed to store builders info"); + } + } + DbRequest::DbDemoteBuilder { + slot, + builder_pub_key, + block_hash, + reason, + failsafe_triggered, + } => { + if let Err(err) = + self.db_demote_builder(slot, &builder_pub_key, &block_hash, reason).await + { + error!(%err, "error demoting builder in database triggering failsafe: stopping all optimistic submissions"); + failsafe_triggered.store(true, Ordering::Relaxed); + alert_discord(&format!( + "{} {} {} failed to demote builder in database! Pausing all optmistic submissions", + builder_pub_key, err, block_hash + )); + } + } + DbRequest::SaveGetHeaderCall { + params, + best_block_hash, + value, + trace, + mev_boost, + user_agent, + } => { + if let Err(err) = self + .save_get_header_call( + params, + best_block_hash, + value, + trace, + mev_boost, + user_agent, + ) + .await + { + error!(%err, "error saving get header call to database"); + } + } + DbRequest::SaveFailedGetPayload { slot, block_hash, error, trace } => { + if let Err(err) = self.save_failed_get_payload(slot, block_hash, error, trace).await + { + error!(err = ?err, "error saving failed get payload"); + } + } + DbRequest::SaveGossipedPayloadTrace { block_hash, trace } => { + if let Err(err) = self.save_gossiped_payload_trace(block_hash, trace).await { + error!(%err, "failed to store gossiped payload trace") + } + } + DbRequest::SaveInclusionList { + inclusion_list, + slot, + block_parent_hash, + proposer_pubkey, + } => { + if let Err(err) = self + .save_inclusion_list( + &inclusion_list, + slot, + &block_parent_hash, + &proposer_pubkey, + ) + .await + { + error!(%slot, "failed to save inclusion list Errors: {:?}", err); + } + } + DbRequest::SaveBlockAdjustmentsData { entry } => { + if let Err(err) = self.save_block_adjustments_data(entry).await { + error!(%err, "failed to save block adjustments data"); + } + } + DbRequest::SetKnownValidators { known_validators } => { + if let Err(err) = self.set_known_validators(known_validators).await { + error!(%err, "failed to set known validators"); + } + } + DbRequest::SetProposerDuties { duties } => { + if let Err(err) = self.set_proposer_duties(duties).await { + error!(%err, "failed to set proposer duties"); + } + } + DbRequest::DisableAdjustments { block_hash, failsafe_trigger, adjustments_enabled } => { + if let Err(err) = self.disable_adjustments().await { + failsafe_trigger.store(true, Ordering::Relaxed); + adjustments_enabled.store(false, Ordering::Relaxed); + error!(%block_hash, %err, "failed to disable adjustments in database, pulling the failsafe trigger"); + alert_discord(&format!( + "{} {} failed to disable adjustments in database, pulling the failsafe trigger", + err, block_hash + )); + } + } + DbRequest::SaveMergedBlocks { blocks } => { + if let Err(err) = self.save_merged_blocks(&blocks).await { + error!(%err, "failed to save merged blocks"); + } + } + } + } + async fn _save_validator_registrations( &self, entries: &[SignedValidatorRegistrationEntry], @@ -647,59 +955,15 @@ impl Default for PostgresDatabaseService { let high_priority_pool = cfg.create_pool(None, NoTls).unwrap(); PostgresDatabaseService { - validator_registration_cache: Arc::new(DashMap::new()), - pending_validator_registrations: Arc::new(DashSet::new()), - block_submissions_sender: None, - known_validators_cache: Arc::new(RwLock::new(new_validator_set())), - validator_pool_cache: Arc::new(DashMap::new()), region: 1, pool: Arc::new(pool), high_priority_pool: Arc::new(high_priority_pool), + local_cache: Arc::new(LocalCache::new()), } } } impl PostgresDatabaseService { - /// Assume the entries are already validated - pub fn save_validator_registrations( - &self, - entries: impl Iterator, - pool_name: Option, - user_agent: Option, - ) { - for entry in entries { - self.pending_validator_registrations.insert(entry.registration.message.pubkey); - self.validator_registration_cache.insert( - entry.registration.message.pubkey, - SignedValidatorRegistrationEntry::new( - entry.clone(), - pool_name.clone(), - user_agent.clone(), - ), - ); - } - } - - pub fn is_registration_update_required( - &self, - registration: &SignedValidatorRegistration, - ) -> bool { - if let Some(existing_entry) = - self.validator_registration_cache.get(®istration.message.pubkey) && - existing_entry.registration_info.registration.message.timestamp >= - registration.message.timestamp.saturating_sub(60 * 60) && - existing_entry.registration_info.registration.message.fee_recipient == - registration.message.fee_recipient && - existing_entry.registration_info.registration.message.gas_limit == - registration.message.gas_limit - { - // do registration once per hour, unless fee recipient / gas limit has changed - - return false; - } - true - } - #[instrument(skip_all)] pub async fn get_validator_registration( &self, @@ -743,7 +1007,7 @@ impl PostgresDatabaseService { } #[instrument(skip_all)] - pub async fn get_validator_registrations( + async fn get_validator_registrations( &self, ) -> Result, DatabaseError> { let mut record = DbMetricRecord::new("get_validator_registrations"); @@ -874,177 +1138,6 @@ impl PostgresDatabaseService { Ok(()) } - #[instrument(skip_all)] - pub async fn get_proposer_duties( - &self, - ) -> Result, DatabaseError> { - let mut record = DbMetricRecord::new("get_proposer_duties"); - - let rows = self - .high_priority_pool - .get() - .await? - .query( - " - SELECT * FROM proposer_duties - INNER JOIN validator_registrations - ON proposer_duties.public_key = validator_registrations.public_key - INNER JOIN validator_preferences - ON proposer_duties.public_key = validator_preferences.public_key - WHERE validator_registrations.active = true - ", - &[], - ) - .await?; - - record.record_success(); - parse_rows(rows) - } - - #[instrument(skip_all)] - pub async fn get_validator_preferences( - &self, - pub_key: &BlsPublicKeyBytes, - ) -> Result, DatabaseError> { - let mut record = DbMetricRecord::new("get_validator_preferences"); - - if let Some(cached_entry) = self.validator_registration_cache.get(pub_key) { - record.record_success(); - return Ok(Some(cached_entry.registration_info.preferences.clone())); - } - - let rows = self - .pool - .get() - .await? - .query( - " - SELECT - validator_preferences.filtering, - validator_preferences.trusted_builders, - validator_preferences.header_delay, - validator_preferences.delay_ms, - validator_preferences.disable_inclusion_lists - FROM validator_preferences - WHERE validator_preferences.public_key = $1 - ", - &[&(pub_key.as_slice())], - ) - .await?; - - let result = if rows.is_empty() { - None - } else { - let prefs: ValidatorPreferences = parse_row(&rows[0])?; - Some(prefs) - }; - - record.record_success(); - Ok(result) - } - - pub async fn update_validator_preferences( - &self, - pub_key: &BlsPublicKeyBytes, - preferences: &ValidatorPreferences, - ) -> Result<(), DatabaseError> { - let mut record = DbMetricRecord::new("update_validator_preferences"); - - let mut client = self.pool.get().await?; - let transaction = client.transaction().await?; - - let trusted_builders: Option> = - preferences.trusted_builders.as_ref().map(|builders| builders.to_vec()); - - let filtering_value: i16 = match preferences.filtering { - helix_common::Filtering::Regional => 1, - helix_common::Filtering::Global => 0, - }; - - transaction - .execute( - " - INSERT INTO validator_preferences ( - public_key, - filtering, - trusted_builders, - header_delay, - delay_ms, - disable_inclusion_lists, - manual_override - ) VALUES ($1, $2, $3, $4, $5, $6, TRUE) - ON CONFLICT (public_key) - DO UPDATE SET - filtering = EXCLUDED.filtering, - trusted_builders = EXCLUDED.trusted_builders, - header_delay = EXCLUDED.header_delay, - delay_ms = EXCLUDED.delay_ms, - disable_inclusion_lists = EXCLUDED.disable_inclusion_lists, - manual_override = TRUE - ", - &[ - &pub_key.as_slice(), - &filtering_value, - &trusted_builders, - &preferences.header_delay, - &preferences.delay_ms.map(|v| v as i64), - &preferences.disable_inclusion_lists, - ], - ) - .await?; - - transaction.commit().await?; - - if let Some(mut cached_entry) = self.validator_registration_cache.get_mut(pub_key) { - cached_entry.registration_info.preferences = preferences.clone(); - } - - record.record_success(); - - Ok(()) - } - - #[instrument(skip_all)] - pub async fn get_pool_validators( - &self, - pool_name: &str, - ) -> Result, DatabaseError> { - let mut record = DbMetricRecord::new("get_pool_validators"); - - let rows = self - .pool - .get() - .await? - .query( - " - SELECT public_key - FROM trusted_proposers - WHERE name = $1 - ", - &[&pool_name], - ) - .await?; - - let mut validators = Vec::new(); - for row in rows { - let pubkey_bytes: Vec = row.try_get("public_key")?; - - if pubkey_bytes.len() == 48 { - let mut key = [0u8; 48]; - key.copy_from_slice(&pubkey_bytes); - validators.push(BlsPublicKeyBytes::from(key)); - } else { - error!( - length = pubkey_bytes.len(), - "Invalid validator pubkey length in trusted_proposers" - ); - } - } - - record.record_success(); - Ok(validators) - } - pub async fn set_known_validators( &self, known_validators: Vec, @@ -1058,7 +1151,7 @@ impl PostgresDatabaseService { let keys_to_remove: Vec; { - let mut curr_known_guard = self.known_validators_cache.write(); + let mut curr_known_guard = self.local_cache.known_validators_cache.write(); info!("Known validators: current cache size: {:?}", curr_known_guard.len()); keys_to_add = new_keys_set.difference(&curr_known_guard).cloned().collect(); @@ -1113,54 +1206,32 @@ impl PostgresDatabaseService { Ok(()) } - pub fn known_validators_cache(&self) -> &Arc>> { - &self.known_validators_cache - } - #[instrument(skip_all)] - pub async fn get_validator_pool_name( - &self, - api_key: &str, - ) -> Result, DatabaseError> { - let mut record = DbMetricRecord::new("get_validator_pool_name"); - - let client = self.high_priority_pool.get().await?; - - if self.validator_pool_cache.is_empty() { - let rows = client.query("SELECT * FROM validator_pools", &[]).await?; - for row in rows { - let api_key: String = row.get::<&str, &str>("api_key").to_string(); - let name: String = row.get::<&str, &str>("name").to_string(); - self.validator_pool_cache.insert(api_key, name); - } - } + pub async fn load_validator_pools(&self) { + let mut record = DbMetricRecord::new("load_validator_pools"); - if self.validator_pool_cache.contains_key(api_key) { - return Ok(self.validator_pool_cache.get(api_key).map(|f| f.clone())); - } - - let api_key = api_key.to_string(); - let rows = match client - .query("SELECT * FROM validator_pools WHERE api_key = $1", &[&api_key]) - .await - { + let client = match self.high_priority_pool.get().await { + Ok(client) => client, + Err(e) => { + error!("Error getting client from pool: {}", e); + return; + } + }; + let rows = match client.query("SELECT * FROM validator_pools", &[]).await { Ok(rows) => rows, Err(e) => { error!("Error querying validator_pools: {}", e); - return Err(DatabaseError::from(e)); + return; } }; - if rows.is_empty() { - return Ok(None); + for row in rows { + let api_key: String = row.get::<&str, &str>("api_key").to_string(); + let name: String = row.get::<&str, &str>("name").to_string(); + self.local_cache.validator_pool_cache.insert(api_key, name); } - let name: String = rows[0].get("name"); - - self.validator_pool_cache.insert(api_key.to_string(), name.clone()); - record.record_success(); - Ok(Some(name)) } #[instrument(skip_all)] @@ -1370,68 +1441,6 @@ impl PostgresDatabaseService { Ok(()) } - #[instrument(skip_all)] - pub async fn store_block_submission( - &self, - submission: SignedBidSubmission, - trace: SubmissionTrace, - optimistic_version: OptimisticVersion, - is_adjusted: bool, - ) -> Result<(), DatabaseError> { - let mut record = DbMetricRecord::new("store_block_submission"); - if let Some(sender) = &self.block_submissions_sender { - sender - .send(PendingBlockSubmissionValue { - submission, - trace, - optimistic_version, - is_adjusted, - }) - .await - .map_err(|_| DatabaseError::ChannelSendError)?; - } else { - return Err(DatabaseError::ChannelSendError); - } - record.record_success(); - Ok(()) - } - - #[instrument(skip_all)] - pub async fn store_builder_info( - &self, - builder_pub_key: &BlsPublicKeyBytes, - builder_info: &BuilderInfo, - ) -> Result<(), DatabaseError> { - let mut record = DbMetricRecord::new("store_builder_info"); - - info!("Storing builder info for {:?}", builder_pub_key); - - self.pool - .get() - .await? - .execute( - " - INSERT INTO builder_info (public_key, collateral, is_optimistic, is_optimistic_for_regional_filtering, builder_id, builder_ids) - VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (public_key) - DO UPDATE SET - builder_ids = array_concat_uniq(COALESCE(builder_info.builder_ids, '{}'::character varying[]), EXCLUDED.builder_ids) - ", - &[ - &(builder_pub_key.as_slice()), - &(PostgresNumeric::from(builder_info.collateral)), - &(builder_info.is_optimistic), - &(builder_info.is_optimistic_for_regional_filtering), - &(builder_info.builder_id), - &(builder_info.builder_ids), - ], - ) - .await?; - - record.record_success(); - Ok(()) - } - #[instrument(skip_all)] pub async fn store_builders_info( &self, @@ -1521,7 +1530,7 @@ impl PostgresDatabaseService { } #[instrument(skip_all)] - pub async fn get_all_builder_infos(&self) -> Result, DatabaseError> { + async fn get_all_builder_infos(&self) -> Result, DatabaseError> { let mut record = DbMetricRecord::new("get_all_builder_infos"); let rows = @@ -1531,18 +1540,6 @@ impl PostgresDatabaseService { parse_rows(rows) } - #[instrument(skip_all)] - pub async fn check_builder_api_key(&self, api_key: &str) -> Result { - let mut record = DbMetricRecord::new("check_builder_api_key"); - - let client = self.high_priority_pool.get().await?; - let rows = - client.query("SELECT * FROM builder_info WHERE api_key = $1", &[&(api_key)]).await?; - - record.record_success(); - Ok(!rows.is_empty()) - } - #[instrument(skip_all)] pub async fn db_demote_builder( &self, diff --git a/crates/relay/src/database/postgres/postgres_db_service_tests.rs b/crates/relay/src/database/postgres/postgres_db_service_tests.rs index 5f753d6f..2fb1cb14 100644 --- a/crates/relay/src/database/postgres/postgres_db_service_tests.rs +++ b/crates/relay/src/database/postgres/postgres_db_service_tests.rs @@ -209,56 +209,6 @@ mod tests { assert!(result.is_ok()); } - #[tokio::test] - async fn test_save_and_get_builder_info() { - run_setup().await; - - let db_service = PostgresDatabaseService::new(&test_config(), 0).unwrap(); - - let public_key = BlsPublicKey::deserialize(&alloy_primitives::hex!("8C266FD5CB50B5D9431DAA69C4BE17BC9A79A85D172112DA09E0AC3E2D0DCF785021D49B6DF57827D6BC61EBA086A507")).unwrap().serialize().into(); - let builder_info = helix_common::BuilderInfo { - collateral: U256::from(10000000000000000000u64), - is_optimistic: false, - is_optimistic_for_regional_filtering: false, - builder_id: None, - builder_ids: Some(vec!["test3".to_string()]), - api_key: None, - }; - - let result = db_service.store_builder_info(&public_key, &builder_info).await; - assert!(result.is_ok()); - - let result = db_service.get_all_builder_infos().await.unwrap(); - assert!(result.len() == 1); - assert!(result[0].pub_key == public_key); - } - - #[tokio::test] - async fn test_demotion() { - run_setup().await; - - let db_service = PostgresDatabaseService::new(&test_config(), 0).unwrap(); - - let key = BlsSecretKey::random(); - let public_key = key.public_key().serialize().into(); - - let builder_info = helix_common::BuilderInfo { - collateral: Default::default(), - is_optimistic: false, - is_optimistic_for_regional_filtering: false, - builder_id: None, - builder_ids: None, - api_key: None, - }; - - let result = db_service.store_builder_info(&public_key, &builder_info).await; - assert!(result.is_ok()); - - let result = - db_service.db_demote_builder(0, &public_key, &Default::default(), "".to_string()).await; - assert!(result.is_ok()); - } - #[tokio::test] async fn test_store_block_submission() -> Result<(), Box> { run_setup().await; diff --git a/crates/relay/src/housekeeper/error.rs b/crates/relay/src/housekeeper/error.rs index ae11793c..49348b9a 100644 --- a/crates/relay/src/housekeeper/error.rs +++ b/crates/relay/src/housekeeper/error.rs @@ -13,4 +13,7 @@ pub enum HousekeeperError { #[error("auctioneer error. {0}")] Auctioneer(#[from] AuctioneerError), + + #[error("oneshot channel recv error. {0}")] + Channel(#[from] tokio::sync::oneshot::error::RecvError), } diff --git a/crates/relay/src/housekeeper/housekeeper.rs b/crates/relay/src/housekeeper/housekeeper.rs index 3beba2ff..2dfabc58 100644 --- a/crates/relay/src/housekeeper/housekeeper.rs +++ b/crates/relay/src/housekeeper/housekeeper.rs @@ -19,13 +19,13 @@ use tokio::sync::{Mutex, broadcast}; use tracing::{Instrument, debug, error, info, warn}; use crate::{ + DbHandle, auctioneer::Event, beacon::{ error::BeaconClientError, multi_beacon_client::MultiBeaconClient, types::{HeadEventData, StateId}, }, - database::postgres::postgres_db_service::PostgresDatabaseService, housekeeper::{ EthereumPrimevService, error::HousekeeperError, inclusion_list::InclusionListService, }, @@ -34,8 +34,6 @@ use crate::{ const PROPOSER_DUTIES_UPDATE_FREQ: u64 = 1; -const TRUSTED_PROPOSERS_UPDATE_FREQ: u64 = 5; - const CUTOFF_TIME: Duration = Duration::from_secs(4); // Constants for known validators refresh logic. @@ -47,9 +45,7 @@ struct HousekeeperSlots { head: Arc, proposer_duties: Arc, refreshed_validators: Arc, - trusted_proposers: Arc, // updating can take >> slot time, so we avoid concurrent updates by locking the mutex - updating_builder_infos: Arc>, updating_proposer_duties: Arc>, updating_refreshed_validators: Arc>, updating_trusted_proposers: Arc>, @@ -75,12 +71,6 @@ impl HousekeeperSlots { fn update_refreshed_validators(&self, head_slot: Slot) { self.refreshed_validators.store(head_slot.as_u64(), Ordering::Relaxed); } - fn trusted_proposers(&self) -> u64 { - self.trusted_proposers.load(Ordering::Relaxed) - } - fn update_trusted_proposers(&self, head_slot: Slot) { - self.trusted_proposers.store(head_slot.as_u64(), Ordering::Relaxed); - } } /// Housekeeper Service. @@ -91,19 +81,18 @@ impl HousekeeperSlots { /// will sync through db. #[derive(Clone)] pub struct Housekeeper { - db: Arc, + db: DbHandle, beacon_client: Arc, auctioneer: Arc, chain_info: Arc, primev_service: Option, slots: HousekeeperSlots, inclusion_list_service: Option, - local_builders: Vec, } impl Housekeeper { pub fn new( - db: Arc, + db: DbHandle, beacon_client: Arc, auctioneer: Arc, config: &RelayConfig, @@ -133,7 +122,6 @@ impl Housekeeper { primev_service, slots: HousekeeperSlots::default(), inclusion_list_service, - local_builders: config.builders.clone(), } } @@ -239,25 +227,6 @@ impl Housekeeper { } } - // builder info updated every slot - let housekeeper = self.clone(); - task::spawn( - file!(), - line!(), - async move { - let Ok(_guard) = housekeeper.slots.updating_builder_infos.try_lock() else { - warn!("builder info update already in progress"); - return; - }; - let start = Instant::now(); - if let Err(err) = housekeeper.sync_builder_info_changes().await { - error!(%err, "failed to sync builder info changes"); - } - info!(duration = ?start.elapsed(), "sync builder info task completed"); - } - .in_current_span(), - ); - // save merged blocks let housekeeper = self.clone(); spawn_tracked!(async move { @@ -267,39 +236,10 @@ impl Housekeeper { }; let start = Instant::now(); let merged_blocks = housekeeper.auctioneer.get_merged_blocks(); - if let Err(err) = housekeeper.db.save_merged_blocks(&merged_blocks).await { - error!(%err, "failed to save merged blocks"); - } + housekeeper.db.save_merged_blocks(merged_blocks); housekeeper.auctioneer.clear_merged_blocks(); info!(duration = ?start.elapsed(), "save merged blocks task completed"); }); - - // trusted proposers - if self.should_update_trusted_proposers(head_slot.as_u64()) { - if is_local_dev() { - warn!("skipping refresh of trusted proposers") - } else { - let housekeeper = self.clone(); - task::spawn( - file!(), - line!(), - async move { - let Ok(_guard) = housekeeper.slots.updating_trusted_proposers.try_lock() else { - warn!("trusted proposer update already in progress"); - return; - }; - let start = Instant::now(); - if let Err(err) = housekeeper.update_trusted_proposers().await { - error!(%err, "failed to update trusted proposers"); - } else { - housekeeper.slots.update_trusted_proposers(head_slot); - } - info!(duration = ?start.elapsed(), "update trusted proposers task completed"); - } - .in_current_span(), - ); - } - } } #[tracing::instrument(skip_all, name = "update_proposer_duties", fields(epoch = %epoch))] @@ -377,21 +317,7 @@ impl Housekeeper { let validators = self.beacon_client.get_state_validators(StateId::Head).await?; debug!(validators = validators.len(), duration = ?start.elapsed(), "fetched validators"); - self.db.set_known_validators(validators).await?; - - Ok(()) - } - - /// Synchronizes builder information changes. - async fn sync_builder_info_changes(&self) -> Result<(), HousekeeperError> { - let mut builder_infos = self.db.get_all_builder_infos().await?; - debug!(builder_infos = builder_infos.len(), "updating builder infos"); - - if is_local_dev() { - builder_infos.extend_from_slice(self.local_builders.as_slice()); - } - - self.auctioneer.update_builder_infos(&builder_infos, true); + self.db.set_known_validators(validators); Ok(()) } @@ -451,7 +377,7 @@ impl Housekeeper { if is_local_dev() { warn!("skipping proposer duty update in db"); } else { - self.db.set_proposer_duties(formatted_proposer_duties).await?; + self.db.set_proposer_duties(formatted_proposer_duties); } Ok(()) @@ -466,7 +392,7 @@ impl Housekeeper { async fn primev_update_with_duties( primev_service: EthereumPrimevService, auctioneer: Arc, - db: Arc, + db: DbHandle, proposer_duties: Vec, ) -> Result<(), HousekeeperError> { let primev_validators = @@ -534,26 +460,7 @@ impl Housekeeper { auctioneer.update_builder_infos(&primev_builders_config, false); - db.store_builders_info(primev_builders_config.as_slice()).await?; - - Ok(()) - } - - fn should_update_trusted_proposers(&self, head_slot: u64) -> bool { - let last_updated = self.slots.trusted_proposers(); - head_slot.is_multiple_of(TRUSTED_PROPOSERS_UPDATE_FREQ) || - head_slot.saturating_sub(last_updated) >= TRUSTED_PROPOSERS_UPDATE_FREQ - } - - /// Update the proposer whitelist. - /// - /// This function will fetch the proposer whitelist from the database and update the auctioneer. - /// It will also update the `refreshed_trusted_proposers_slot` to the current `head_slot`. - async fn update_trusted_proposers(&self) -> Result<(), HousekeeperError> { - let start = Instant::now(); - let proposer_whitelist = self.db.get_trusted_proposers().await?; - debug!(proposer_whitelist = proposer_whitelist.len(), duration = ?start.elapsed(), "fetched trusted proposers"); - self.auctioneer.update_trusted_proposers(proposer_whitelist); + db.store_builders_info(primev_builders_config); Ok(()) } @@ -572,12 +479,11 @@ impl Housekeeper { /// Fetch validator registrations for `pub_keys` from database. async fn fetch_signed_validator_registrations( &self, - pubkeys: &[&BlsPublicKeyBytes], + pubkeys: &[BlsPublicKeyBytes], ) -> Result, HousekeeperError> { let registrations: Vec = - self.db.get_validator_registrations_for_pub_keys(pubkeys).await?; - + self.auctioneer.get_validator_registrations_for_pub_keys(pubkeys); let registrations = registrations.into_iter().map(|entry| (*entry.public_key(), entry)).collect(); @@ -593,8 +499,8 @@ impl Housekeeper { HousekeeperError, > { let proposer_duties = self.fetch_duties(epoch.as_u64()).await?; - let pubkeys: Vec<&BlsPublicKeyBytes> = - proposer_duties.iter().map(|duty| &duty.pubkey).collect(); + let pubkeys: Vec = + proposer_duties.iter().map(|duty| duty.pubkey).collect(); let signed_validator_registrations = self.fetch_signed_validator_registrations(&pubkeys).await?; diff --git a/crates/relay/src/housekeeper/inclusion_list/service.rs b/crates/relay/src/housekeeper/inclusion_list/service.rs index 656d4477..311524df 100644 --- a/crates/relay/src/housekeeper/inclusion_list/service.rs +++ b/crates/relay/src/housekeeper/inclusion_list/service.rs @@ -8,10 +8,10 @@ use helix_common::{ local_cache::LocalCache, }; use helix_types::{BlsPublicKeyBytes, Slot}; -use tracing::{error, info, warn}; +use tracing::{info, warn}; use crate::{ - auctioneer::Event, database::postgres::postgres_db_service::PostgresDatabaseService, + DbHandle, auctioneer::Event, housekeeper::inclusion_list::http_fetcher::HttpInclusionListFetcher, network::RelayNetworkManager, }; @@ -20,7 +20,7 @@ const MISSING_INCLUSION_LIST_CUTOFF: Duration = Duration::from_secs(6); #[derive(Clone)] pub struct InclusionListService { - db: Arc, + db: DbHandle, local_cache: Arc, http_il_fetcher: HttpInclusionListFetcher, chain_info: Arc, @@ -30,7 +30,7 @@ pub struct InclusionListService { impl InclusionListService { pub fn new( - db: Arc, + db: DbHandle, local_cache: Arc, config: InclusionListConfig, chain_info: Arc, @@ -83,15 +83,7 @@ impl InclusionListService { il: Some(inclusion_list.clone()), }); - match self.db.save_inclusion_list(&inclusion_list, head_slot, &parent_hash, &pub_key).await - { - Ok(_) => { - info!(head_slot = head_slot, "Saved inclusion list to postgres"); - } - Err(err) => { - error!(head_slot, "Could not save inclusion list to postgres. Error: {:?}", err); - } - } + self.db.save_inclusion_list(inclusion_list, head_slot, parent_hash, pub_key); } async fn fetch_inclusion_list_or_timeout(&self, slot: u64) -> Option { diff --git a/crates/relay/src/housekeeper/mod.rs b/crates/relay/src/housekeeper/mod.rs index 45988784..248821ff 100644 --- a/crates/relay/src/housekeeper/mod.rs +++ b/crates/relay/src/housekeeper/mod.rs @@ -19,8 +19,8 @@ pub use slot_info::CurrentSlotInfo; use tokio::sync::broadcast; use crate::{ - auctioneer::Event, beacon::multi_beacon_client::MultiBeaconClient, - database::postgres::postgres_db_service::PostgresDatabaseService, network::RelayNetworkManager, + DbHandle, auctioneer::Event, beacon::multi_beacon_client::MultiBeaconClient, + network::RelayNetworkManager, }; const HEAD_EVENT_CHANNEL_SIZE: usize = 100; @@ -28,7 +28,7 @@ const PAYLOAD_ATTRIBUTE_CHANNEL_SIZE: usize = 300; /// Start housekeeper and chain updater pub async fn start_housekeeper( - db: Arc, + db: DbHandle, auctioneer: Arc, config: &RelayConfig, beacon_client: Arc, diff --git a/crates/relay/src/lib.rs b/crates/relay/src/lib.rs index 067c5906..4e988581 100644 --- a/crates/relay/src/lib.rs +++ b/crates/relay/src/lib.rs @@ -9,9 +9,15 @@ mod website; pub use crate::{ api::{Api, BidAdjustor, DefaultBidAdjustor, start_admin_service, start_api_service}, - auctioneer::{PayloadEntry, SimulatorClient, SimulatorRequest, SlotData, SubmissionPayload}, + auctioneer::{ + AuctioneerHandle, Event, PayloadEntry, RegWorkerHandle, SimulatorClient, SimulatorRequest, + SlotData, SubmissionPayload, spawn_workers, + }, beacon::start_beacon_client, - database::{postgres::postgres_db_service::PostgresDatabaseService, start_db_service}, + database::{ + DbRequest, PendingBlockSubmissionValue, handle::DbHandle, + postgres::postgres_db_service::PostgresDatabaseService, start_db_service, + }, housekeeper::start_housekeeper, network::RelayNetworkManager, website::WebsiteService, diff --git a/crates/relay/src/main.rs b/crates/relay/src/main.rs index 2c90303e..fd32edf6 100644 --- a/crates/relay/src/main.rs +++ b/crates/relay/src/main.rs @@ -9,6 +9,7 @@ use std::{ use eyre::eyre; use helix_common::{ RelayConfig, + api::builder_api::TopBidUpdate, api_provider::DefaultApiProvider, load_config, load_keypair, local_cache::LocalCache, @@ -18,8 +19,9 @@ use helix_common::{ utils::{init_panic_hook, init_tracing_log}, }; use helix_relay::{ - Api, DefaultBidAdjustor, PostgresDatabaseService, RelayNetworkManager, WebsiteService, - start_admin_service, start_api_service, start_beacon_client, start_db_service, + Api, AuctioneerHandle, DbHandle, DbRequest, DefaultBidAdjustor, Event, + PendingBlockSubmissionValue, RegWorkerHandle, RelayNetworkManager, WebsiteService, + spawn_workers, start_admin_service, start_api_service, start_beacon_client, start_db_service, start_housekeeper, }; use helix_types::BlsKeypair; @@ -60,8 +62,39 @@ fn main() { config.logging.dir_path(), ); + let (db_request_sender, db_request_receiver) = crossbeam_channel::bounded(10_000); + let (db_batch_request_sender, db_batch_request_receiver) = crossbeam_channel::bounded(10_000); + let (top_bid_tx, _) = tokio::sync::broadcast::channel(100); + let event_channel = crossbeam_channel::bounded(10_000); + let event_tx = event_channel.0.clone(); + + let db = DbHandle::new(db_request_sender.clone(), db_batch_request_sender.clone()); + let local_cache = LocalCache::new(); + + // spawn auctioneer + let (auctioneer_handle, registrations_handle) = spawn_workers( + config.clone(), + db.clone(), + local_cache.clone(), + DefaultBidAdjustor {}, + top_bid_tx.clone(), + event_channel, + ); + block_on(start_metrics_server(&config)); - match block_on(run(instance_id, config, keypair)) { + match block_on(start_network_services( + instance_id, + config, + keypair, + db, + local_cache, + auctioneer_handle, + registrations_handle, + db_request_receiver, + db_batch_request_receiver, + top_bid_tx, + event_tx, + )) { Ok(_) => info!("relay exited"), Err(err) => { error!(%err, "relay exited with error"); @@ -70,10 +103,25 @@ fn main() { } } -async fn run(instance_id: String, config: RelayConfig, keypair: BlsKeypair) -> eyre::Result<()> { +#[allow(clippy::too_many_arguments)] +async fn start_network_services( + instance_id: String, + config: RelayConfig, + keypair: BlsKeypair, + db: DbHandle, + mut local_cache: LocalCache, + auctioneer_handle: AuctioneerHandle, + registrations_handle: RegWorkerHandle, + db_request_receiver: crossbeam_channel::Receiver, + db_batch_request_receiver: crossbeam_channel::Receiver, + top_bid_tx: tokio::sync::broadcast::Sender, + event_channel: crossbeam_channel::Sender, +) -> eyre::Result<()> { let beacon_client = start_beacon_client(&config); let chain_info = beacon_client.load_chain_info().await; let chain_info = Arc::new(chain_info); + local_cache.set_chain_info(chain_info.clone()); + let local_cache = Arc::new(local_cache); info!( instance_id, @@ -87,15 +135,18 @@ async fn run(instance_id: String, config: RelayConfig, keypair: BlsKeypair) -> e let known_validators_loaded = Arc::new(AtomicBool::default()); - let db = start_db_service(&config, known_validators_loaded.clone()).await?; - let local_cache = start_auctioneer(db.clone()).await?; + let pg_db = start_db_service( + &config, + known_validators_loaded.clone(), + db_request_receiver, + db_batch_request_receiver, + local_cache.clone(), + ) + .await?; - let event_channel = crossbeam_channel::bounded(10_000); let relay_network_api = RelayNetworkManager::new(config.relay_network.clone(), relay_signing_context.clone()); - let (top_bid_tx, _) = tokio::sync::broadcast::channel(100); - config.router_config.validate_bid_sorter()?; let current_slot_info = start_housekeeper( @@ -104,7 +155,7 @@ async fn run(instance_id: String, config: RelayConfig, keypair: BlsKeypair) -> e &config, beacon_client.clone(), chain_info.clone(), - event_channel.0.clone(), + event_channel, relay_network_api.clone(), ) .await @@ -116,6 +167,7 @@ async fn run(instance_id: String, config: RelayConfig, keypair: BlsKeypair) -> e tokio::spawn(start_api_service::( config.clone(), + pg_db.clone(), db.clone(), local_cache, current_slot_info, @@ -123,18 +175,18 @@ async fn run(instance_id: String, config: RelayConfig, keypair: BlsKeypair) -> e relay_signing_context, beacon_client, Arc::new(DefaultApiProvider {}), - DefaultBidAdjustor {}, known_validators_loaded, terminating.clone(), top_bid_tx, - event_channel, + auctioneer_handle.clone(), + registrations_handle.clone(), relay_network_api.api(), )); let termination_grace_period = config.router_config.shutdown_delay_ms; if config.website.enabled { - tokio::spawn(WebsiteService::run_loop(config, db)); + tokio::spawn(WebsiteService::run_loop(config, pg_db)); } // wait for SIGTERM or SIGINT @@ -157,14 +209,3 @@ async fn run(instance_id: String, config: RelayConfig, keypair: BlsKeypair) -> e Ok(()) } - -pub async fn start_auctioneer(db: Arc) -> eyre::Result> { - let auctioneer = Arc::new(LocalCache::new()); - let auctioneer_clone = auctioneer.clone(); - tokio::spawn(async move { - let builder_infos = db.get_all_builder_infos().await.expect("failed to load builder infos"); - auctioneer_clone.update_builder_infos(&builder_infos, true); - }); - - Ok(auctioneer) -} diff --git a/crates/simulator/src/block_merging/mod.rs b/crates/simulator/src/block_merging/mod.rs index 1fc69a15..e74a2cfe 100644 --- a/crates/simulator/src/block_merging/mod.rs +++ b/crates/simulator/src/block_merging/mod.rs @@ -48,9 +48,8 @@ use crate::{ block_merging::{ error::BlockMergingApiError, types::{ - BlockMergeRequestV1, BlockMergeResponseV1, DistributionConfig, - MergeableOrderBytes, MergeableOrderRecovered, RecoveredTx, SignedTx, SimulatedOrder, - SimulationError, + BlockMergeRequestV1, BlockMergeResponseV1, DistributionConfig, MergeableOrderBytes, + MergeableOrderRecovered, RecoveredTx, SignedTx, SimulatedOrder, SimulationError, }, }, common::CachedRethDb,