Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ parking_lot.workspace = true
prometheus.workspace = true
rand.workspace = true
reqwest.workspace = true
rustc-hash.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
Expand Down
110 changes: 104 additions & 6 deletions crates/common/src/local_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,28 @@ use axum::{
response::{IntoResponse, Response},
};
use dashmap::{DashMap, DashSet};
use helix_types::{BlsPublicKeyBytes, CryptoError, MergedBlock};
use helix_types::{BlsPublicKeyBytes, CryptoError, MergedBlock, SignedValidatorRegistration};
use http::HeaderValue;
use parking_lot::RwLock;
use rustc_hash::FxHashSet;
use tracing::error;

use crate::{
BuilderConfig, BuilderInfo, ProposerInfo,
api::builder_api::{
BuilderGetValidatorsResponseEntry, InclusionListWithKey, InclusionListWithMetadata,
SlotCoordinate,
BuilderConfig, BuilderInfo, ProposerInfo, SignedValidatorRegistrationEntry,
api::{
builder_api::{
BuilderGetValidatorsResponseEntry, InclusionListWithKey, InclusionListWithMetadata,
SlotCoordinate,
},
proposer_api::ValidatorRegistrationInfo,
},
chain_info::ChainInfo,
};

const ESTIMATED_TRUSTED_PROPOSERS: usize = 200_000;
const ESTIMATED_BUILDER_INFOS_UPPER_BOUND: usize = 1000;
const MAX_PRIMEV_PROPOSERS: usize = 64;
const VALIDATOR_REGISTRATION_UPDATE_INTERVAL: u64 = 60 * 60; // 1 hour in seconds

#[derive(Debug, thiserror::Error)]
pub enum AuctioneerError {
Expand Down Expand Up @@ -94,6 +100,14 @@ pub struct LocalCache {
kill_switch: Arc<AtomicBool>,
proposer_duties: Arc<RwLock<Vec<BuilderGetValidatorsResponseEntry>>>,
merged_blocks: Arc<DashMap<B256, MergedBlock>>,
pub validator_registration_cache:
Arc<DashMap<BlsPublicKeyBytes, SignedValidatorRegistrationEntry>>,
pub pending_validator_registrations: Arc<DashSet<BlsPublicKeyBytes>>,
pub known_validators_cache: Arc<RwLock<FxHashSet<BlsPublicKeyBytes>>>,
pub validator_pool_cache: Arc<DashMap<String, String>>,
chain_info: Option<Arc<ChainInfo>>,
pub adjustments_enabled: Arc<AtomicBool>,
pub adjustments_failsafe_trigger: Arc<AtomicBool>,
}

impl LocalCache {
Expand All @@ -104,8 +118,17 @@ impl LocalCache {
let trusted_proposers = Arc::new(DashMap::with_capacity(ESTIMATED_TRUSTED_PROPOSERS));
let primev_proposers = Arc::new(DashSet::with_capacity(MAX_PRIMEV_PROPOSERS));
let kill_switch = Arc::new(AtomicBool::new(false));
let proposer_duties = Arc::new(RwLock::new(Vec::new()));
let proposer_duties = Arc::new(RwLock::new(Vec::with_capacity(1000)));
let merged_blocks = Arc::new(DashMap::with_capacity(1000));
let validator_registration_cache = Arc::new(DashMap::with_capacity(1_8000_000));
let pending_validator_registrations = Arc::new(DashSet::with_capacity(20_000));
let known_validators_cache = Arc::new(RwLock::new(FxHashSet::with_capacity_and_hasher(
1_200_000,
Default::default(),
)));
let validator_pool_cache = Arc::new(DashMap::with_capacity(1000));
let adjustments_enabled = Arc::new(AtomicBool::new(false));
let adjustments_failsafe_trigger = Arc::new(AtomicBool::new(false));

Self {
inclusion_list: Default::default(),
Expand All @@ -116,6 +139,13 @@ impl LocalCache {
kill_switch,
proposer_duties,
merged_blocks,
validator_registration_cache,
pending_validator_registrations,
known_validators_cache,
validator_pool_cache,
chain_info: None,
adjustments_enabled,
adjustments_failsafe_trigger,
}
}

Expand Down Expand Up @@ -234,6 +264,74 @@ impl LocalCache {
self.merged_blocks.get(block_hash).map(|b| b.value().clone())
}

pub fn is_registration_update_required(
&self,
registration: &SignedValidatorRegistration,
) -> bool {
if let Some(existing_entry) =
self.validator_registration_cache.get(&registration.message.pubkey) &&
existing_entry.registration_info.registration.message.timestamp >=
registration
.message
.timestamp
.saturating_sub(VALIDATOR_REGISTRATION_UPDATE_INTERVAL) &&
existing_entry.registration_info.registration.message.fee_recipient ==
registration.message.fee_recipient &&
existing_entry.registration_info.registration.message.gas_limit ==
registration.message.gas_limit
{
// do registration once per hour, unless fee recipient / gas limit has changed

return false;
}
true
}

/// Assume the entries are already validated
pub fn save_validator_registrations(
&self,
entries: impl Iterator<Item = ValidatorRegistrationInfo>,
pool_name: Option<String>,
user_agent: Option<String>,
) {
for entry in entries {
self.pending_validator_registrations.insert(entry.registration.message.pubkey);
self.validator_registration_cache.insert(
entry.registration.message.pubkey,
SignedValidatorRegistrationEntry::new(
entry.clone(),
pool_name.clone(),
user_agent.clone(),
),
);
}
}

pub fn get_validator_pool_name(&self, api_key: &str) -> Option<String> {
self.validator_pool_cache.get(api_key).map(|v| v.value().clone())
}

pub fn get_chain_info(&self) -> Option<Arc<ChainInfo>> {
self.chain_info.clone()
}

pub fn set_chain_info(&mut self, chain_info: Arc<ChainInfo>) {
self.chain_info = Some(chain_info);
}

pub fn get_validator_registrations_for_pub_keys(
&self,
pub_keys: &[BlsPublicKeyBytes],
) -> Vec<SignedValidatorRegistrationEntry> {
let mut registrations = Vec::with_capacity(pub_keys.len());
for pub_key in pub_keys {
if let Some(entry) = self.validator_registration_cache.get(pub_key) {
registrations.push(entry.clone());
}
}
registrations
}

pub fn get_merged_blocks(&self) -> Vec<MergedBlock> {
self.merged_blocks.iter().map(|b| b.value().clone()).collect()
}
Expand Down
8 changes: 4 additions & 4 deletions crates/relay/src/api/builder/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ use axum::{Extension, http::StatusCode, response::IntoResponse};
use helix_common::{RelayConfig, api::builder_api::TopBidUpdate, local_cache::LocalCache};

use crate::{
api::Api, auctioneer::AuctioneerHandle,
database::postgres::postgres_db_service::PostgresDatabaseService, housekeeper::CurrentSlotInfo,
api::Api, auctioneer::AuctioneerHandle, database::handle::DbHandle,
housekeeper::CurrentSlotInfo,
};

pub(crate) const MAX_PAYLOAD_LENGTH: usize = 1024 * 1024 * 20; // 20MB

#[derive(Clone)]
pub struct BuilderApi<A: Api> {
pub local_cache: Arc<LocalCache>,
pub db: Arc<PostgresDatabaseService>,
pub db: DbHandle,
pub curr_slot_info: CurrentSlotInfo,
pub relay_config: Arc<RelayConfig>,
/// Subscriber for TopBid updates, SSZ encoded
Expand All @@ -25,7 +25,7 @@ pub struct BuilderApi<A: Api> {
impl<A: Api> BuilderApi<A> {
pub fn new(
local_cache: Arc<LocalCache>,
db: Arc<PostgresDatabaseService>,
db: DbHandle,
relay_config: RelayConfig,
curr_slot_info: CurrentSlotInfo,
top_bid_tx: tokio::sync::broadcast::Sender<TopBidUpdate>,
Expand Down
9 changes: 2 additions & 7 deletions crates/relay/src/api/builder/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use helix_common::{GossipedPayloadTrace, spawn_tracked, utils::utcnow_ns};
use helix_common::{GossipedPayloadTrace, utils::utcnow_ns};
use tracing::{debug, error};
use uuid::Uuid;

Expand All @@ -20,11 +20,6 @@ impl<A: Api> BuilderApi<A> {
}

// Save gossiped payload trace to db
let db = self.db.clone();
spawn_tracked!(async move {
if let Err(err) = db.save_gossiped_payload_trace(block_hash, trace).await {
error!(%err, "failed to store gossiped payload trace")
}
});
self.db.save_gossiped_payload_trace(block_hash, trace);
}
}
25 changes: 7 additions & 18 deletions crates/relay/src/api/proposer/get_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,24 +115,13 @@ impl<A: Api> ProposerApi<A> {
let bid_block_hash = *bid.block_hash();
let value = *bid.value();

let db = proposer_api.db.clone();
spawn_tracked!(
async move {
if let Err(err) = db
.save_get_header_call(
params,
bid_block_hash,
value,
trace,
is_mev_boost,
user_agent,
)
.await
{
error!(%err, "error saving get header call to database");
}
}
.in_current_span()
proposer_api.db.save_get_header_call(
params,
bid_block_hash,
value,
trace,
is_mev_boost,
user_agent,
);

let fork = proposer_api.chain_info.current_fork_name();
Expand Down
54 changes: 21 additions & 33 deletions crates/relay/src/api/proposer/get_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,12 @@ impl<A: Api> ProposerApi<A> {
Ok(get_payload_response) => Ok(axum::Json(get_payload_response)),
Err(err) => {
// Save error to DB
if let Err(err) = proposer_api
.db
.save_failed_get_payload(slot.into(), block_hash, err.to_string(), trace)
.await
{
error!(err = ?err, "error saving failed get payload");
}
proposer_api.db.save_failed_get_payload(
slot.into(),
block_hash,
err.to_string(),
trace,
);

Err(err)
}
Expand Down Expand Up @@ -220,13 +219,12 @@ impl<A: Api> ProposerApi<A> {
Ok(_get_payload_response) => Ok(StatusCode::ACCEPTED),
Err(err) => {
// Save error to DB
if let Err(err) = proposer_api
.db
.save_failed_get_payload(slot.into(), block_hash, err.to_string(), trace)
.await
{
error!(err = ?err, "error saving failed get payload");
}
proposer_api.db.save_failed_get_payload(
slot.into(),
block_hash,
err.to_string(),
trace,
);

Err(err)
}
Expand Down Expand Up @@ -293,19 +291,13 @@ impl<A: Api> ProposerApi<A> {
warn!(error = %err, "get_payload was sent too late");

// Save too late request to db for debugging
if let Err(err) = self
.db
.save_too_late_get_payload(
(head_slot + 1).into(),
&proposer_public_key,
&block_hash,
trace.receive,
trace.payload_fetched,
)
.await
{
error!(%err, "failed to save too late get payload");
}
self.db.save_too_late_get_payload(
(head_slot + 1).into(),
proposer_public_key,
block_hash,
trace.receive,
trace.payload_fetched,
);

return Err(err);
}
Expand Down Expand Up @@ -441,7 +433,6 @@ impl<A: Api> ProposerApi<A> {
user_agent: Option<String>,
filtering: Filtering,
) {
let db = self.db.clone();
let trace = *trace;
let params = SavePayloadParams {
slot,
Expand All @@ -454,11 +445,8 @@ impl<A: Api> ProposerApi<A> {
user_agent,
filtering,
};
spawn_tracked!(async move {
if let Err(err) = db.save_delivered_payload(&params).await {
error!(%err, "error saving payload to database");
}
});

self.db.save_delivered_payload(params);
}

pub(crate) async fn gossip_payload(
Expand Down
6 changes: 3 additions & 3 deletions crates/relay/src/api/proposer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ use crate::{
api::{Api, router::Terminating},
auctioneer::{AuctioneerHandle, RegWorkerHandle},
beacon::multi_beacon_client::MultiBeaconClient,
database::postgres::postgres_db_service::PostgresDatabaseService,
database::handle::DbHandle,
gossip::GrpcGossiperClientManager,
housekeeper::CurrentSlotInfo,
};

#[derive(Clone)]
pub struct ProposerApi<A: Api> {
pub local_cache: Arc<LocalCache>,
pub db: Arc<PostgresDatabaseService>,
pub db: DbHandle,
pub gossiper: Arc<GrpcGossiperClientManager>,
pub multi_beacon_client: Arc<MultiBeaconClient>,
pub api_provider: Arc<A::ApiProvider>,
Expand All @@ -45,7 +45,7 @@ pub struct ProposerApi<A: Api> {
impl<A: Api> ProposerApi<A> {
pub fn new(
local_cache: Arc<LocalCache>,
db: Arc<PostgresDatabaseService>,
db: DbHandle,
gossiper: Arc<GrpcGossiperClientManager>,
api_provider: Arc<A::ApiProvider>,
signing_context: Arc<RelaySigningContext>,
Expand Down
Loading