diff --git a/Cargo.lock b/Cargo.lock index d6ed74db..811093df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,7 +75,7 @@ dependencies = [ "anyhow", "caryatid_sdk", "config", - "pallas", + "pallas 0.32.1", "tokio", "tracing", ] @@ -123,7 +123,7 @@ dependencies = [ "dashmap", "hex", "imbl 6.0.0", - "pallas", + "pallas 0.32.1", "serde", "serde_json", "tokio", @@ -139,7 +139,7 @@ dependencies = [ "caryatid_sdk", "config", "hex", - "pallas", + "pallas 0.32.1", "reqwest 0.12.23", "serde_json", "tokio", @@ -174,7 +174,7 @@ dependencies = [ "chrono", "config", "mithril-client", - "pallas", + "pallas 0.32.1", "reqwest 0.11.27", "serde_json", "tokio", @@ -192,7 +192,7 @@ dependencies = [ "config", "hex", "num-rational", - "pallas", + "pallas 0.32.1", "reqwest 0.11.27", "serde", "serde_json", @@ -234,7 +234,7 @@ dependencies = [ "config", "fraction", "hex", - "pallas", + "pallas 0.32.1", "serde_json", "tokio", "tracing", @@ -266,6 +266,7 @@ dependencies = [ "dashmap", "hex", "imbl 5.0.0", + "pallas 0.33.0", "rayon", "serde", "serde_json", @@ -285,8 +286,8 @@ dependencies = [ "caryatid_sdk", "config", "hex", - "pallas-addresses", - "pallas-crypto", + "pallas-addresses 0.32.1", + "pallas-crypto 0.32.1", "serde", "serde_json", "serde_json_any_key", @@ -306,7 +307,7 @@ dependencies = [ "config", "futures", "hex", - "pallas", + "pallas 0.32.1", "serde", "serde_json", "tokio", @@ -322,7 +323,7 @@ dependencies = [ "caryatid_sdk", "config", "crossbeam", - "pallas", + "pallas 0.32.1", "serde", "serde_json", "tokio", @@ -3302,11 +3303,11 @@ dependencies = [ "mithril-build-script", "mithril-stm", "nom 8.0.0", - "pallas-addresses", - "pallas-codec", - "pallas-network", - "pallas-primitives", - "pallas-traverse", + "pallas-addresses 0.32.1", + "pallas-codec 0.32.1", + "pallas-network 0.32.1", + "pallas-primitives 0.32.1", + "pallas-traverse 0.32.1", "rand_chacha 0.3.1", "rand_core 0.6.4", "rayon", @@ -3693,16 +3694,33 @@ version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6cc85b0d73cc19b7c1e09d6f2c5c4abfad3923671654ba6ef8fd00c9d0ee4c58" dependencies = [ - "pallas-addresses", - "pallas-codec", - "pallas-configs", - "pallas-crypto", + "pallas-addresses 0.32.1", + "pallas-codec 0.32.1", + "pallas-configs 0.32.1", + "pallas-crypto 0.32.1", "pallas-hardano", - "pallas-network", - "pallas-primitives", - "pallas-traverse", - "pallas-txbuilder", - "pallas-utxorpc", + "pallas-network 0.32.1", + "pallas-primitives 0.32.1", + "pallas-traverse 0.32.1", + "pallas-txbuilder 0.32.1", + "pallas-utxorpc 0.32.1", +] + +[[package]] +name = "pallas" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37bba5e9e84978df0d42b72c3b92ead5a3b1c4852f66c08b648a5c057f58717a" +dependencies = [ + "pallas-addresses 0.33.0", + "pallas-codec 0.33.0", + "pallas-configs 0.33.0", + "pallas-crypto 0.33.0", + "pallas-network 0.33.0", + "pallas-primitives 0.33.0", + "pallas-traverse 0.33.0", + "pallas-txbuilder 0.33.0", + "pallas-utxorpc 0.33.0", ] [[package]] @@ -3716,8 +3734,24 @@ dependencies = [ "crc", "cryptoxide", "hex", - "pallas-codec", - "pallas-crypto", + "pallas-codec 0.32.1", + "pallas-crypto 0.32.1", + "thiserror 1.0.69", +] + +[[package]] +name = "pallas-addresses" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18f5f4dd205316335bf8eef77227e01a8a00b1fd60503d807520e93dd0362d0e" +dependencies = [ + "base58", + "bech32 0.9.1", + "crc", + "cryptoxide", + "hex", + "pallas-codec 0.33.0", + "pallas-crypto 0.33.0", "thiserror 1.0.69", ] @@ -3729,11 +3763,27 @@ checksum = "6a861573364d48ff0952b12d3f139e05a843b8209f134a0c2b028449cb59ed68" dependencies = [ "chrono", "hex", - "pallas-addresses", - "pallas-codec", - "pallas-crypto", - "pallas-primitives", - "pallas-traverse", + "pallas-addresses 0.32.1", + "pallas-codec 0.32.1", + "pallas-crypto 0.32.1", + "pallas-primitives 0.32.1", + "pallas-traverse 0.32.1", + "rand 0.8.5", +] + +[[package]] +name = "pallas-applying" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b196174663e1c4eb80a286b8ddca78f75fca5fc57b0baaa5b1143a6dd76ca71b" +dependencies = [ + "chrono", + "hex", + "pallas-addresses 0.33.0", + "pallas-codec 0.33.0", + "pallas-crypto 0.33.0", + "pallas-primitives 0.33.0", + "pallas-traverse 0.33.0", "rand 0.8.5", ] @@ -3749,6 +3799,18 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "pallas-codec" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2737b05f0dbb6d197feeb26ef15d2567e54833184bd469f5655a0537da89fa" +dependencies = [ + "hex", + "minicbor 0.25.1", + "serde", + "thiserror 1.0.69", +] + [[package]] name = "pallas-configs" version = "0.32.1" @@ -3758,10 +3820,28 @@ dependencies = [ "base64 0.22.1", "hex", "num-rational", - "pallas-addresses", - "pallas-codec", - "pallas-crypto", - "pallas-primitives", + "pallas-addresses 0.32.1", + "pallas-codec 0.32.1", + "pallas-crypto 0.32.1", + "pallas-primitives 0.32.1", + "serde", + "serde_json", + "serde_with 3.14.0", +] + +[[package]] +name = "pallas-configs" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a4e63bff98bd71b3057a0986dc72e6ba58afaf063bce3dc8243fda5f0665726" +dependencies = [ + "base64 0.22.1", + "hex", + "num-rational", + "pallas-addresses 0.33.0", + "pallas-codec 0.33.0", + "pallas-crypto 0.33.0", + "pallas-primitives 0.33.0", "serde", "serde_json", "serde_with 3.14.0", @@ -3775,7 +3855,22 @@ checksum = "59c89ea16190a87a1d8bd36923093740a2b659ed6129f4636329319a70cc4db3" dependencies = [ "cryptoxide", "hex", - "pallas-codec", + "pallas-codec 0.32.1", + "rand_core 0.6.4", + "serde", + "thiserror 1.0.69", + "zeroize", +] + +[[package]] +name = "pallas-crypto" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0368945cd093e550febe36aef085431b1611c2e9196297cd70f4b21a4add054c" +dependencies = [ + "cryptoxide", + "hex", + "pallas-codec 0.33.0", "rand_core 0.6.4", "serde", "thiserror 1.0.69", @@ -3789,8 +3884,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f980c9e0579642a5c8a902231a499b826fdb8673585be9d3068eb9b04ccc980" dependencies = [ "binary-layout", - "pallas-network", - "pallas-traverse", + "pallas-network 0.32.1", + "pallas-traverse 0.32.1", "tap", "thiserror 1.0.69", "tracing", @@ -3805,8 +3900,26 @@ dependencies = [ "byteorder", "hex", "itertools 0.13.0", - "pallas-codec", - "pallas-crypto", + "pallas-codec 0.32.1", + "pallas-crypto 0.32.1", + "rand 0.8.5", + "socket2 0.5.10", + "thiserror 1.0.69", + "tokio", + "tracing", +] + +[[package]] +name = "pallas-network" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1244da7a760a08b8a9d9a28a28112f10a7b6476d64192696a269cfd09a7ec55c" +dependencies = [ + "byteorder", + "hex", + "itertools 0.13.0", + "pallas-codec 0.33.0", + "pallas-crypto 0.33.0", "rand 0.8.5", "socket2 0.5.10", "thiserror 1.0.69", @@ -3824,8 +3937,24 @@ dependencies = [ "bech32 0.9.1", "hex", "log", - "pallas-codec", - "pallas-crypto", + "pallas-codec 0.32.1", + "pallas-crypto 0.32.1", + "serde", + "serde_json", +] + +[[package]] +name = "pallas-primitives" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb2acde8875c43446194d387c60fe2d6a127e4f8384bef3dcabd5a04e9422429" +dependencies = [ + "base58", + "bech32 0.9.1", + "hex", + "log", + "pallas-codec 0.33.0", + "pallas-crypto 0.33.0", "serde", "serde_json", ] @@ -3838,10 +3967,27 @@ checksum = "be7fbb1db75a0b6b32d1808b2cc5c7ba6dd261f289491bb86998b987b4716883" dependencies = [ "hex", "itertools 0.13.0", - "pallas-addresses", - "pallas-codec", - "pallas-crypto", - "pallas-primitives", + "pallas-addresses 0.32.1", + "pallas-codec 0.32.1", + "pallas-crypto 0.32.1", + "pallas-primitives 0.32.1", + "paste", + "serde", + "thiserror 1.0.69", +] + +[[package]] +name = "pallas-traverse" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab64895a0d94fed1ef2d99dd37e480ed0483e91eb98dcd2f94cc614fb9575173" +dependencies = [ + "hex", + "itertools 0.13.0", + "pallas-addresses 0.33.0", + "pallas-codec 0.33.0", + "pallas-crypto 0.33.0", + "pallas-primitives 0.33.0", "paste", "serde", "thiserror 1.0.69", @@ -3854,12 +4000,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fff83ae515a88b1ecf5354468d9fd3562d915e5eceb5c9467f6b1cdce60a3e9a" dependencies = [ "hex", - "pallas-addresses", - "pallas-codec", - "pallas-crypto", - "pallas-primitives", - "pallas-traverse", - "pallas-wallet", + "pallas-addresses 0.32.1", + "pallas-codec 0.32.1", + "pallas-crypto 0.32.1", + "pallas-primitives 0.32.1", + "pallas-traverse 0.32.1", + "pallas-wallet 0.32.1", + "serde", + "serde_json", + "thiserror 1.0.69", +] + +[[package]] +name = "pallas-txbuilder" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46ff1f49d99aced71b20daa68577167e1db3f0aaffe92fbc1de6df0b6002a66e" +dependencies = [ + "hex", + "pallas-addresses 0.33.0", + "pallas-codec 0.33.0", + "pallas-crypto 0.33.0", + "pallas-primitives 0.33.0", + "pallas-traverse 0.33.0", + "pallas-wallet 0.33.0", "serde", "serde_json", "thiserror 1.0.69", @@ -3871,11 +4035,26 @@ version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "810ccda35242fef9ea583a0819da7617b6761a86c6070f16aea27ac80ad4da75" dependencies = [ - "pallas-applying", - "pallas-codec", - "pallas-crypto", - "pallas-primitives", - "pallas-traverse", + "pallas-applying 0.32.1", + "pallas-codec 0.32.1", + "pallas-crypto 0.32.1", + "pallas-primitives 0.32.1", + "pallas-traverse 0.32.1", + "prost-types", + "utxorpc-spec", +] + +[[package]] +name = "pallas-utxorpc" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bdf89daca5ebfbcd9b5cf8b480486302ffd3401f6891d3c4f02087fd7687b94" +dependencies = [ + "pallas-applying 0.33.0", + "pallas-codec 0.33.0", + "pallas-crypto 0.33.0", + "pallas-primitives 0.33.0", + "pallas-traverse 0.33.0", "prost-types", "utxorpc-spec", ] @@ -3890,7 +4069,22 @@ dependencies = [ "bip39", "cryptoxide", "ed25519-bip32", - "pallas-crypto", + "pallas-crypto 0.32.1", + "rand 0.8.5", + "thiserror 1.0.69", +] + +[[package]] +name = "pallas-wallet" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d91b48fe1d0d07b425aed4b1c6ac5d962e0a392ccc58e2f3faa8ad250a5c364" +dependencies = [ + "bech32 0.9.1", + "bip39", + "cryptoxide", + "ed25519-bip32", + "pallas-crypto 0.33.0", "rand 0.8.5", "thiserror 1.0.69", ] diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index 6b339673..d196aa0b 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use crate::{DRepChoice, KeyHash}; +use crate::{DRepChoice, KeyHash, PoolLiveStakeInfo}; pub const DEFAULT_ACCOUNTS_QUERY_TOPIC: (&str, &str) = ("accounts-state-query-topic", "cardano.query.accounts"); @@ -27,6 +27,7 @@ pub enum AccountsStateQuery { GetOptimalPoolSizing, GetPoolsLiveStakes { pools_operators: Vec> }, GetPoolDelegators { pool_operator: KeyHash }, + GetPoolLiveStake { pool_operator: KeyHash }, // Dreps related queries GetDrepDelegators { drep: DRepChoice }, @@ -55,6 +56,7 @@ pub enum AccountsStateQueryResponse { OptimalPoolSizing(Option), PoolsLiveStakes(Vec), PoolDelegators(PoolDelegators), + PoolLiveStake(PoolLiveStakeInfo), // DReps related responses DrepDelegators(DrepDelegators), diff --git a/common/src/queries/epochs.rs b/common/src/queries/epochs.rs index c951eb06..4218d39c 100644 --- a/common/src/queries/epochs.rs +++ b/common/src/queries/epochs.rs @@ -1,4 +1,4 @@ -use crate::{messages::EpochActivityMessage, protocol_params::ProtocolParams, BlockHash, KeyHash}; +use crate::{messages::EpochActivityMessage, protocol_params::ProtocolParams, KeyHash}; pub const DEFAULT_EPOCHS_QUERY_TOPIC: (&str, &str) = ("epochs-state-query-topic", "cardano.query.epochs"); @@ -13,12 +13,7 @@ pub enum EpochsStateQuery { GetEpochStakeDistributionByPool { epoch_number: u64 }, GetEpochBlockDistribution { epoch_number: u64 }, GetEpochBlockDistributionByPool { epoch_number: u64 }, - - // Pools related queries - GetBlocksMintedByPools { vrf_key_hashes: Vec }, - GetTotalBlocksMintedByPools { vrf_key_hashes: Vec }, - GetBlocksMintedInfoByPool { vrf_key_hash: KeyHash }, - GetBlockHashesByPool { vrf_key_hash: KeyHash }, + GetLatestEpochBlocksMintedByPool { vrf_key_hash: KeyHash }, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -31,12 +26,7 @@ pub enum EpochsStateQueryResponse { EpochStakeDistributionByPool(EpochStakeDistributionByPool), EpochBlockDistribution(EpochBlockDistribution), EpochBlockDistributionByPool(EpochBlockDistributionByPool), - - // Pools related responses - BlocksMintedByPools(BlocksMintedByPools), - TotalBlocksMintedByPools(TotalBlocksMintedByPools), - BlocksMintedInfoByPool(BlocksMintedInfoByPool), - BlockHashesByPool(BlockHashesByPool), + LatestEpochBlocksMintedByPool(u64), NotFound, Error(String), @@ -74,26 +64,3 @@ pub struct EpochBlockDistribution {} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct EpochBlockDistributionByPool {} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct BlocksMintedByPools { - // this is in same order of vrf_key_hashes from EpochsStateQuery::BlocksMintedByPools - pub blocks_minted: Vec, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct TotalBlocksMintedByPools { - // this is in same order of vrf_key_hashes from EpochsStateQuery::TotalBlocksMinted - pub total_blocks_minted: Vec, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct BlocksMintedInfoByPool { - pub total_blocks_minted: u64, - pub epoch_blocks_minted: u64, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct BlockHashesByPool { - pub hashes: Vec, -} diff --git a/common/src/queries/pools.rs b/common/src/queries/pools.rs index 72ebf151..6608cf45 100644 --- a/common/src/queries/pools.rs +++ b/common/src/queries/pools.rs @@ -1,6 +1,6 @@ use crate::{ - queries::governance::VoteRecord, rational_number::RationalNumber, KeyHash, PoolEpochState, - PoolMetadata, PoolRegistration, PoolRetirement, PoolUpdateEvent, Relay, + queries::governance::VoteRecord, rational_number::RationalNumber, BlockHash, KeyHash, + PoolEpochState, PoolMetadata, PoolRegistration, PoolRetirement, PoolUpdateEvent, Relay, }; pub const DEFAULT_POOLS_QUERY_TOPIC: (&str, &str) = @@ -20,26 +20,35 @@ pub enum PoolsStateQuery { pools_operators: Vec, epoch: u64, }, + GetPoolsTotalBlocksMinted { + pools_operators: Vec, + }, GetPoolInfo { - pool_id: Vec, + pool_id: KeyHash, }, GetPoolHistory { - pool_id: Vec, + pool_id: KeyHash, }, GetPoolMetadata { - pool_id: Vec, + pool_id: KeyHash, }, GetPoolRelays { - pool_id: Vec, + pool_id: KeyHash, }, GetPoolDelegators { - pool_id: Vec, + pool_id: KeyHash, + }, + GetPoolTotalBlocksMinted { + pool_id: KeyHash, + }, + GetPoolBlockHashes { + pool_id: KeyHash, }, GetPoolUpdates { - pool_id: Vec, + pool_id: KeyHash, }, GetPoolVotes { - pool_id: Vec, + pool_id: KeyHash, }, } @@ -51,11 +60,14 @@ pub enum PoolsStateQueryResponse { PoolsRetiringList(Vec), PoolActiveStakeInfo(PoolActiveStakeInfo), PoolsActiveStakes(Vec), - PoolInfo(PoolInfo), + PoolsTotalBlocksMinted(Vec), + PoolInfo(PoolRegistration), PoolHistory(Vec), PoolMetadata(PoolMetadata), PoolRelays(Vec), PoolDelegators(PoolDelegators), + PoolTotalBlocksMinted(u64), + PoolBlockHashes(Vec), PoolUpdates(Vec), PoolVotes(Vec), NotFound, @@ -73,9 +85,6 @@ pub struct PoolActiveStakeInfo { pub active_size: RationalNumber, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct PoolInfo {} - #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PoolDelegators { pub delegators: Vec<(KeyHash, u64)>, diff --git a/common/src/stake_addresses.rs b/common/src/stake_addresses.rs index 6ccd5e10..8658f675 100644 --- a/common/src/stake_addresses.rs +++ b/common/src/stake_addresses.rs @@ -10,8 +10,8 @@ use std::{ use crate::{ math::update_value_with_delta, messages::DRepDelegationDistribution, DRepChoice, - DRepCredential, DelegatedStake, KeyHash, Lovelace, StakeAddressDelta, StakeCredential, - Withdrawal, + DRepCredential, DelegatedStake, KeyHash, Lovelace, PoolLiveStakeInfo, StakeAddressDelta, + StakeCredential, Withdrawal, }; use anyhow::Result; use dashmap::DashMap; @@ -110,6 +110,34 @@ impl StakeAddressMap { self.get(stake_key).map(|sas| sas.registered).unwrap_or(false) } + /// Get Pool's Live Stake Info + pub fn get_pool_live_stake_info(&self, spo: &KeyHash) -> PoolLiveStakeInfo { + let total_live_stakes = AtomicU64::new(0); + let live_stake = AtomicU64::new(0); + let live_delegators = AtomicU64::new(0); + + // Par Iter stake addresses values + self.inner.par_iter().for_each(|(_, sas)| { + total_live_stakes.fetch_add(sas.utxo_value, std::sync::atomic::Ordering::Relaxed); + if sas.delegated_spo.as_ref().map(|d_spo| d_spo.eq(spo)).unwrap_or(false) { + live_stake.fetch_add( + sas.utxo_value + sas.rewards, + std::sync::atomic::Ordering::Relaxed, + ); + live_delegators.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + }); + + let total_live_stakes = total_live_stakes.load(std::sync::atomic::Ordering::Relaxed); + let live_stake = live_stake.load(std::sync::atomic::Ordering::Relaxed); + let live_delegators = live_delegators.load(std::sync::atomic::Ordering::Relaxed); + PoolLiveStakeInfo { + live_stake, + live_delegators, + total_live_stakes, + } + } + /// Get Pool's Live Stake (same order as spos) pub fn get_pools_live_stakes(&self, spos: &Vec) -> Vec { let mut live_stakes_map = HashMap::::new(); diff --git a/common/src/types.rs b/common/src/types.rs index a9d56fb3..cb9d22fe 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -638,7 +638,7 @@ pub struct PoolRetirement { } /// Pool Update Action -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum PoolUpdateAction { Registered, Deregistered, @@ -670,6 +670,14 @@ impl PoolUpdateEvent { } } +/// Pool Live Stake Info +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct PoolLiveStakeInfo { + pub live_stake: u64, + pub live_delegators: u64, + pub total_live_stakes: u64, +} + /// Pool Epoch History Data #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PoolEpochState { diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 6823d00b..a63bb5a2 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -461,6 +461,12 @@ impl AccountsState { }) } + AccountsStateQuery::GetPoolLiveStake { pool_operator } => { + AccountsStateQueryResponse::PoolLiveStake( + state.get_pool_live_stake_info(pool_operator), + ) + } + AccountsStateQuery::GetDrepDelegators { drep } => { AccountsStateQueryResponse::DrepDelegators(DrepDelegators { delegators: state.get_drep_delegators(drep), diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index ae3644b9..05e622e4 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -3,6 +3,7 @@ use crate::monetary::calculate_monetary_change; use crate::rewards::{RewardsResult, RewardsState}; use crate::snapshot::Snapshot; use acropolis_common::queries::accounts::OptimalPoolSizing; +use acropolis_common::PoolLiveStakeInfo; use acropolis_common::{ math::update_value_with_delta, messages::{ @@ -106,6 +107,11 @@ impl State { Some(OptimalPoolSizing { total_supply, nopt }) } + /// Get Pool Live Stake Info + pub fn get_pool_live_stake_info(&self, pool_operator: &KeyHash) -> PoolLiveStakeInfo { + self.stake_addresses.lock().unwrap().get_pool_live_stake_info(pool_operator) + } + /// Get Pools Live stake pub fn get_pools_live_stakes(&self, pool_operators: &Vec) -> Vec { self.stake_addresses.lock().unwrap().get_pools_live_stakes(pool_operators) diff --git a/modules/epochs_state/src/epochs_history.rs b/modules/epochs_state/src/epochs_history.rs index aeb173b6..84123c99 100644 --- a/modules/epochs_state/src/epochs_history.rs +++ b/modules/epochs_state/src/epochs_history.rs @@ -73,19 +73,19 @@ mod tests { #[test] fn epochs_history_is_none_when_store_history_is_false() { - let epochs_history = EpochsHistoryState::new(&StoreConfig::new(false, false)); + let epochs_history = EpochsHistoryState::new(&StoreConfig::new(false)); assert!(epochs_history.epochs_history.is_none()); } #[test] fn epochs_history_is_some_when_store_history_is_true() { - let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true, false)); + let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true)); assert!(epochs_history.epochs_history.is_some()); } #[test] fn handle_epoch_activity_saves_history() { - let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true, false)); + let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true)); let block = make_block(200); epochs_history.handle_epoch_activity( &block, diff --git a/modules/epochs_state/src/epochs_state.rs b/modules/epochs_state/src/epochs_state.rs index 44b2105a..cd583dbc 100644 --- a/modules/epochs_state/src/epochs_state.rs +++ b/modules/epochs_state/src/epochs_state.rs @@ -4,8 +4,7 @@ use acropolis_common::{ messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, queries::epochs::{ - BlockHashesByPool, BlocksMintedByPools, BlocksMintedInfoByPool, EpochInfo, - EpochsStateQuery, EpochsStateQueryResponse, LatestEpoch, TotalBlocksMintedByPools, + EpochInfo, EpochsStateQuery, EpochsStateQueryResponse, LatestEpoch, DEFAULT_EPOCHS_QUERY_TOPIC, }, state_history::{StateHistory, StateHistoryStore}, @@ -50,14 +49,13 @@ impl EpochsState { async fn run( history: Arc>>, epochs_history: EpochsHistoryState, - store_config: &StoreConfig, mut headers_subscription: Box>, mut fees_subscription: Box>, mut epoch_activity_publisher: EpochActivityPublisher, ) -> Result<()> { loop { // Get a mutable state - let mut state = history.lock().await.get_or_init_with(|| State::new(store_config)); + let mut state = history.lock().await.get_or_init_with(|| State::new()); let mut current_block: Option = None; // Read both topics in parallel @@ -113,9 +111,7 @@ impl EpochsState { // are suppressed upstream match MultiEraHeader::decode(variant, None, &header_msg.raw) { Ok(header) => { - if let Some(vrf_vkey) = header.vrf_vkey() { - state.handle_mint(&block_info, Some(vrf_vkey)); - } + state.handle_mint(&block_info, header.vrf_vkey()); } Err(e) => error!("Can't decode header {}: {e}", block_info.slot), @@ -231,43 +227,12 @@ impl EpochsState { } } - EpochsStateQuery::GetBlocksMintedByPools { vrf_key_hashes } => { - EpochsStateQueryResponse::BlocksMintedByPools(BlocksMintedByPools { - blocks_minted: state.get_blocks_minted_by_pools(vrf_key_hashes), - }) - } - - EpochsStateQuery::GetTotalBlocksMintedByPools { vrf_key_hashes } => { - EpochsStateQueryResponse::TotalBlocksMintedByPools( - TotalBlocksMintedByPools { - total_blocks_minted: state - .get_total_blocks_minted_by_pools(vrf_key_hashes), - }, + EpochsStateQuery::GetLatestEpochBlocksMintedByPool { vrf_key_hash } => { + EpochsStateQueryResponse::LatestEpochBlocksMintedByPool( + state.get_latest_epoch_blocks_minted_by_pool(vrf_key_hash), ) } - EpochsStateQuery::GetBlocksMintedInfoByPool { vrf_key_hash } => { - let (total_blocks_minted, epoch_blocks_minted) = - state.get_blocks_minted_data_by_pool(vrf_key_hash); - EpochsStateQueryResponse::BlocksMintedInfoByPool(BlocksMintedInfoByPool { - total_blocks_minted, - epoch_blocks_minted, - }) - } - - EpochsStateQuery::GetBlockHashesByPool { vrf_key_hash } => { - if state.is_block_hashes_enabled() { - let hashes = state.get_block_hashes(vrf_key_hash); - EpochsStateQueryResponse::BlockHashesByPool(BlockHashesByPool { - hashes, - }) - } else { - EpochsStateQueryResponse::Error( - "Block hashes are not enabled".to_string(), - ) - } - } - _ => EpochsStateQueryResponse::Error(format!( "Unimplemented query variant: {:?}", query @@ -284,7 +249,6 @@ impl EpochsState { Self::run( history, epochs_history, - &store_config, headers_subscription, fees_subscription, epoch_activity_publisher, diff --git a/modules/epochs_state/src/state.rs b/modules/epochs_state/src/state.rs index 7ca28a2c..80d5bfcb 100644 --- a/modules/epochs_state/src/state.rs +++ b/modules/epochs_state/src/state.rs @@ -1,17 +1,11 @@ //! Acropolis epoch activity counter: state storage -use acropolis_common::{ - crypto::keyhash, messages::EpochActivityMessage, BlockHash, BlockInfo, KeyHash, -}; -use imbl::{HashMap, Vector}; +use acropolis_common::{crypto::keyhash, messages::EpochActivityMessage, BlockInfo, KeyHash}; +use imbl::HashMap; use tracing::info; -use crate::store_config::StoreConfig; - #[derive(Default, Debug, Clone)] pub struct State { - store_config: StoreConfig, - // block number block: u64, @@ -26,53 +20,27 @@ pub struct State { // fees seen this epoch epoch_fees: u64, - - // Total blocks minted till block number - // Keyed by vrf_key_hash - total_blocks_minted: HashMap, - - // block hashes by vrf_key_hash - block_hashes: Option>>, } impl State { // Constructor - pub fn new(store_config: &StoreConfig) -> Self { + pub fn new() -> Self { Self { - store_config: store_config.clone(), block: 0, epoch: 0, blocks_minted: HashMap::new(), epoch_blocks: 0, epoch_fees: 0, - total_blocks_minted: HashMap::new(), - block_hashes: if store_config.store_block_hashes { - Some(HashMap::new()) - } else { - None - }, } } - pub fn is_block_hashes_enabled(&self) -> bool { - self.store_config.store_block_hashes - } - // Handle a block minting, taking the SPO's VRF vkey - pub fn handle_mint(&mut self, block: &BlockInfo, vrf_vkey: Option<&[u8]>) { + pub fn handle_mint(&mut self, _block: &BlockInfo, vrf_vkey: Option<&[u8]>) { self.epoch_blocks += 1; if let Some(vrf_vkey) = vrf_vkey { let vrf_key_hash = keyhash(vrf_vkey); // Count one on this hash *(self.blocks_minted.entry(vrf_key_hash.clone()).or_insert(0)) += 1; - *(self.total_blocks_minted.entry(vrf_key_hash.clone()).or_insert(0)) += 1; - - if let Some(block_hashes) = self.block_hashes.as_mut() { - block_hashes - .entry(vrf_key_hash.clone()) - .or_insert_with(Vector::new) - .push_back(block.hash); - } } } @@ -113,39 +81,8 @@ impl State { } } - /// Get current epoch's blocks minted for each vrf key hash - pub fn get_blocks_minted_by_pools(&self, vrf_key_hashes: &Vec) -> Vec { - vrf_key_hashes - .iter() - .map(|key_hash| self.blocks_minted.get(key_hash).map(|v| *v as u64).unwrap_or(0)) - .collect() - } - - /// Get epoch's total blocks minted for each vrf key hash till current block number - pub fn get_total_blocks_minted_by_pools(&self, vrf_key_hashes: &Vec) -> Vec { - vrf_key_hashes - .iter() - .map(|key_hash| self.total_blocks_minted.get(key_hash).map(|v| *v as u64).unwrap_or(0)) - .collect() - } - - pub fn get_blocks_minted_data_by_pool(&self, vrf_key_hash: &KeyHash) -> (u64, u64) { - ( - self.total_blocks_minted.get(vrf_key_hash).map(|v| *v as u64).unwrap_or(0), - self.blocks_minted.get(vrf_key_hash).map(|v| *v as u64).unwrap_or(0), - ) - } - - /// Get Block Hashes by Vrf Key Hash - pub fn get_block_hashes(&self, vrf_key_hash: &KeyHash) -> Vec { - let Some(block_hashes) = self.block_hashes.as_ref() else { - return vec![]; - }; - - block_hashes - .get(vrf_key_hash) - .map(|hashes| hashes.into_iter().cloned().collect()) - .unwrap_or_default() + pub fn get_latest_epoch_blocks_minted_by_pool(&self, vrf_key_hash: &KeyHash) -> u64 { + self.blocks_minted.get(vrf_key_hash).map(|v| *v as u64).unwrap_or(0) } } @@ -191,16 +128,15 @@ mod tests { #[test] fn initial_state_is_zeroed() { - let state = State::new(&StoreConfig::default()); + let state = State::new(); assert_eq!(state.epoch_blocks, 0); assert_eq!(state.epoch_fees, 0); assert!(state.blocks_minted.is_empty()); - assert!(state.block_hashes.is_none()); } #[test] fn handle_mint_single_vrf_records_counts() { - let mut state = State::new(&StoreConfig::default()); + let mut state = State::new(); let vrf = b"vrf_key"; let mut block = make_block(100); state.handle_mint(&block, Some(vrf)); @@ -213,12 +149,11 @@ mod tests { assert_eq!(state.epoch_blocks, 2); assert_eq!(state.blocks_minted.len(), 1); assert_eq!(state.blocks_minted.get(&keyhash(vrf)), Some(&2)); - assert_eq!(state.total_blocks_minted.get(&keyhash(vrf)), Some(&2)); } #[test] fn handle_mint_multiple_vrf_records_counts() { - let mut state = State::new(&StoreConfig::default()); + let mut state = State::new(); let mut block = make_block(100); state.handle_mint(&block, Some(b"vrf_1")); block.number += 1; @@ -237,33 +172,13 @@ mod tests { Some(2) ); - let blocks_minted_data = state.get_blocks_minted_data_by_pool(&keyhash(b"vrf_2")); - assert_eq!(blocks_minted_data.0, 2); - assert_eq!(blocks_minted_data.1, 2); - } - - #[test] - fn store_block_hashes_after_handle_mint() { - let mut state = State::new(&StoreConfig::new(false, true)); - let mut block = make_block(100); - state.handle_mint(&block, Some(b"vrf_1")); - block.number += 1; - state.handle_mint(&block, Some(b"vrf_2")); - block.number += 1; - state.handle_mint(&block, Some(b"vrf_2")); - - let block_hashes_1 = state.get_block_hashes(&keyhash(b"vrf_1")); - assert_eq!(block_hashes_1.len(), 1); - assert_eq!(block_hashes_1[0], block.hash); - let block_hashes_2 = state.get_block_hashes(&keyhash(b"vrf_2")); - assert_eq!(block_hashes_2.len(), 2); - assert_eq!(block_hashes_2[0], block.hash); - assert_eq!(block_hashes_2[1], block.hash); + let blocks_minted = state.get_latest_epoch_blocks_minted_by_pool(&keyhash(b"vrf_2")); + assert_eq!(blocks_minted, 2); } #[test] fn handle_fees_counts_fees() { - let mut state = State::new(&StoreConfig::default()); + let mut state = State::new(); let mut block = make_block(100); state.handle_fees(&block, 100); @@ -275,7 +190,7 @@ mod tests { #[test] fn end_epoch_resets_and_returns_message() { - let mut state = State::new(&StoreConfig::default()); + let mut state = State::new(); let block = make_block(1); state.handle_mint(&block, Some(b"vrf_1")); state.handle_fees(&block, 123); @@ -297,9 +212,8 @@ mod tests { assert_eq!(state.epoch_fees, 0); assert!(state.blocks_minted.is_empty()); - let blocks_minted_data = state.get_blocks_minted_data_by_pool(&keyhash(b"vrf_1")); - assert_eq!(blocks_minted_data.0, 1); - assert_eq!(blocks_minted_data.1, 0); + let blocks_minted = state.get_latest_epoch_blocks_minted_by_pool(&keyhash(b"vrf_1")); + assert_eq!(blocks_minted, 0); } #[tokio::test] @@ -318,39 +232,24 @@ mod tests { block.number += 1; state.handle_mint(&block, Some(b"vrf_1")); state.handle_fees(&block, 123); - history.lock().await.commit(block.number, state); - - let mut state = history.lock().await.get_current_state(); - block = make_block(2); - let _ = state.end_epoch(&block); - state.handle_mint(&block, Some(b"vrf_1")); - state.handle_fees(&block, 123); - history.lock().await.commit(block.number, state); - - let state = history.lock().await.get_current_state(); - assert_eq!(state.epoch_blocks, 1); - assert_eq!(state.epoch_fees, 123); - assert_eq!( - 1, - state.get_blocks_minted_by_pools(&vec![keyhash(b"vrf_1")])[0] - ); assert_eq!( - 3, - state.get_total_blocks_minted_by_pools(&vec![keyhash(b"vrf_1")])[0] + state.get_latest_epoch_blocks_minted_by_pool(&keyhash(b"vrf_1")), + 2 ); + history.lock().await.commit(block.number, state); - // roll back of epoch 2 - block = make_rolled_back_block(2); + block = make_rolled_back_block(0); let mut state = history.lock().await.get_rolled_back_state(block.number); - let _ = state.end_epoch(&block); state.handle_mint(&block, Some(b"vrf_2")); state.handle_fees(&block, 123); - history.lock().await.commit(block.number, state); - - let state = history.lock().await.get_current_state(); assert_eq!( - 2, - state.get_total_blocks_minted_by_pools(&vec![keyhash(b"vrf_1")])[0] + state.get_latest_epoch_blocks_minted_by_pool(&keyhash(b"vrf_1")), + 0 + ); + assert_eq!( + state.get_latest_epoch_blocks_minted_by_pool(&keyhash(b"vrf_2")), + 1 ); + history.lock().await.commit(block.number, state); } } diff --git a/modules/epochs_state/src/store_config.rs b/modules/epochs_state/src/store_config.rs index d74e51d4..d0ef3f33 100644 --- a/modules/epochs_state/src/store_config.rs +++ b/modules/epochs_state/src/store_config.rs @@ -3,21 +3,16 @@ use std::sync::Arc; use config::Config; const DEFAULT_STORE_HISTORY: (&str, bool) = ("store-history", false); -const DEFAULT_STORE_BLOCK_HAHSES: (&str, bool) = ("store-block-hashes", false); #[derive(Default, Debug, Clone)] pub struct StoreConfig { pub store_history: bool, - pub store_block_hashes: bool, } impl StoreConfig { #[allow(dead_code)] - pub fn new(store_history: bool, store_block_hashes: bool) -> Self { - Self { - store_history, - store_block_hashes, - } + pub fn new(store_history: bool) -> Self { + Self { store_history } } } @@ -27,9 +22,6 @@ impl From> for StoreConfig { store_history: config .get_bool(DEFAULT_STORE_HISTORY.0) .unwrap_or(DEFAULT_STORE_HISTORY.1), - store_block_hashes: config - .get_bool(DEFAULT_STORE_BLOCK_HAHSES.0) - .unwrap_or(DEFAULT_STORE_BLOCK_HAHSES.1), } } } diff --git a/modules/rest_blockfrost/src/handlers/pools.rs b/modules/rest_blockfrost/src/handlers/pools.rs index 17f72717..4c28f897 100644 --- a/modules/rest_blockfrost/src/handlers/pools.rs +++ b/modules/rest_blockfrost/src/handlers/pools.rs @@ -7,8 +7,9 @@ use acropolis_common::{ pools::{PoolsStateQuery, PoolsStateQueryResponse}, utils::query_state, }, + rest_helper::ToCheckedF64, serialization::Bech32WithHrp, - PoolRetirement, StakeCredential, + PoolRetirement, PoolUpdateAction, StakeCredential, TxHash, }; use anyhow::Result; use caryatid_sdk::Context; @@ -19,7 +20,7 @@ use tracing::warn; use crate::{ handlers_config::HandlersConfig, - types::{PoolDelegatorRest, PoolRelayRest}, + types::{PoolDelegatorRest, PoolInfoRest, PoolRelayRest, PoolUpdateEventRest, PoolVoteRest}, }; use crate::{ types::{PoolEpochStateRest, PoolExtendedRest, PoolMetadataRest, PoolRetirementRest}, @@ -102,7 +103,14 @@ pub async fn handle_pools_extended_retired_retiring_single_blockfrost( return handle_pools_retiring_blockfrost(context.clone(), handlers_config.clone()).await } _ => match Vec::::from_bech32_with_hrp(param, "pool") { - Ok(pool_id) => return handle_pools_spo_blockfrost(context.clone(), pool_id).await, + Ok(pool_id) => { + return handle_pools_spo_blockfrost( + context.clone(), + pool_id, + handlers_config.clone(), + ) + .await + } Err(e) => { return Ok(RESTResponse::with_text( 400, @@ -208,18 +216,16 @@ async fn handle_pools_extended_blockfrost( return Ok(RESTResponse::with_json(200, "[]")); }; - // Populate pools_operators and pools_vrf_key_hashes - let pools_operators = - pools_list_with_info.iter().map(|(pool_operator, _)| pool_operator).collect::>(); - let pools_vrf_key_hashes = pools_list_with_info + // Populate pools_operators + let pools_operators = pools_list_with_info .iter() - .map(|(_, pool_registration)| pool_registration.vrf_key_hash.clone()) + .map(|(pool_operator, _)| pool_operator.clone()) .collect::>(); // Get active stake for each pool from spo-state let pools_active_stakes_msg = Arc::new(Message::StateQuery(StateQuery::Pools( PoolsStateQuery::GetPoolsActiveStakes { - pools_operators: pools_operators.iter().map(|&op| op.clone()).collect(), + pools_operators: pools_operators.clone(), epoch: latest_epoch, }, ))); @@ -248,7 +254,7 @@ async fn handle_pools_extended_blockfrost( // Get live stake for each pool from accounts-state let pools_live_stakes_msg = Arc::new(Message::StateQuery(StateQuery::Accounts( AccountsStateQuery::GetPoolsLiveStakes { - pools_operators: pools_operators.iter().map(|&op| op.clone()).collect(), + pools_operators: pools_operators.clone(), }, ))); let pools_live_stakes_f = query_state( @@ -272,29 +278,20 @@ async fn handle_pools_extended_blockfrost( }, ); - // Get total blocks minted for each pool from epochs-state - let total_blocks_minted_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( - EpochsStateQuery::GetTotalBlocksMintedByPools { - vrf_key_hashes: pools_vrf_key_hashes, + // Get total blocks minted for each pool from spo-state + let total_blocks_minted_msg = Arc::new(Message::StateQuery(StateQuery::Pools( + PoolsStateQuery::GetPoolsTotalBlocksMinted { + pools_operators: pools_operators.clone(), }, ))); let total_blocks_minted_f = query_state( &context, - &handlers_config.epochs_query_topic, + &handlers_config.pools_query_topic, total_blocks_minted_msg, |message| match message { - Message::StateQueryResponse(StateQueryResponse::Epochs( - EpochsStateQueryResponse::TotalBlocksMintedByPools(res), - )) => Ok(res.total_blocks_minted), - - Message::StateQueryResponse(StateQueryResponse::Epochs( - EpochsStateQueryResponse::Error(e), - )) => { - return Err(anyhow::anyhow!( - "Internal server error while retrieving pools total blocks minted: {e}" - )); - } - + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::PoolsTotalBlocksMinted(total_blocks_minted), + )) => Ok(total_blocks_minted), _ => return Err(anyhow::anyhow!("Unexpected message type")), }, ); @@ -439,10 +436,308 @@ async fn handle_pools_retiring_blockfrost( } async fn handle_pools_spo_blockfrost( - _context: Arc>, - _pool_operator: Vec, + context: Arc>, + pool_operator: Vec, + handlers_config: Arc, ) -> Result { - Ok(RESTResponse::with_text(501, "Not implemented")) + // Get PoolRegistration from spo state + let pool_info_msg = Arc::new(Message::StateQuery(StateQuery::Pools( + PoolsStateQuery::GetPoolInfo { + pool_id: pool_operator.clone(), + }, + ))); + + let pool_info_f = query_state( + &context, + &handlers_config.pools_query_topic, + pool_info_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::PoolInfo(pool_info), + )) => Ok(pool_info), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::NotFound, + )) => Err(anyhow::anyhow!("Pool Not found")), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while retrieving pool info: {e}" + )), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ); + + // Get Latest Epoch from epochs-state + let latest_epoch_info_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetLatestEpoch, + ))); + let latest_epoch_info_f = query_state( + &context, + &handlers_config.epochs_query_topic, + latest_epoch_info_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::LatestEpoch(res), + )) => Ok(res.epoch), + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(e), + )) => { + return Err(anyhow::anyhow!( + "Internal server error while retrieving latest epoch: {e}" + )); + } + _ => { + return Err(anyhow::anyhow!( + "Unexpected message type while retrieving latest epoch" + )) + } + }, + ); + + // query live stakes from accounts_state + let live_stakes_info_msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetPoolLiveStake { + pool_operator: pool_operator.clone(), + }, + ))); + let live_stakes_info_f = query_state( + &context, + &handlers_config.accounts_query_topic, + live_stakes_info_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::PoolLiveStake(res), + )) => Ok(res), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ); + + // Get optimal_pool_sizing from accounts_state + let optimal_pool_sizing_msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetOptimalPoolSizing, + ))); + let optimal_pool_sizing_f = query_state( + &context, + &handlers_config.accounts_query_topic, + optimal_pool_sizing_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::OptimalPoolSizing(res), + )) => Ok(res), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ); + + // Query pool update events from spo_state + let pool_updates_msg = Arc::new(Message::StateQuery(StateQuery::Pools( + PoolsStateQuery::GetPoolUpdates { + pool_id: pool_operator.clone(), + }, + ))); + let pool_updates_f = query_state( + &context, + &handlers_config.pools_query_topic, + pool_updates_msg, + |message: Message| match message { + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::PoolUpdates(pool_updates), + )) => Ok(Some(pool_updates)), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::NotFound, + )) => Err(anyhow::anyhow!("Pool Not found")), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::Error(_e), + )) => Ok(None), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ); + + // Query total_blocks_minted from spo_state + let total_blocks_minted_msg = Arc::new(Message::StateQuery(StateQuery::Pools( + PoolsStateQuery::GetPoolTotalBlocksMinted { + pool_id: pool_operator.clone(), + }, + ))); + let total_blocks_minted_f = query_state( + &context, + &handlers_config.pools_query_topic, + total_blocks_minted_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::PoolTotalBlocksMinted(total_blocks_minted), + )) => Ok(total_blocks_minted), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ); + + let ( + pool_info, + latest_epoch_info, + live_stakes_info, + optimal_pool_sizing, + pool_updates, + total_blocks_minted, + ) = join!( + pool_info_f, + latest_epoch_info_f, + live_stakes_info_f, + optimal_pool_sizing_f, + pool_updates_f, + total_blocks_minted_f, + ); + let pool_info = pool_info?; + let latest_epoch_info = latest_epoch_info?; + let latest_epoch = latest_epoch_info.epoch; + let live_stakes_info = live_stakes_info?; + let total_blocks_minted = total_blocks_minted?; + let Some(optimal_pool_sizing) = optimal_pool_sizing? else { + // if it is before Shelly Era + return Ok(RESTResponse::with_json(404, "Pool Not Found")); + }; + let pool_updates = pool_updates?; + let registrations: Option> = pool_updates.as_ref().map(|updates| { + updates + .iter() + .filter_map(|update| { + if update.action == PoolUpdateAction::Registered { + Some(update.tx_hash) + } else { + None + } + }) + .collect() + }); + let retirements: Option> = pool_updates.as_ref().map(|updates| { + updates + .iter() + .filter_map(|update| { + if update.action == PoolUpdateAction::Deregistered { + Some(update.tx_hash) + } else { + None + } + }) + .collect() + }); + + // Query blocks_minted from epochs_state + let epoch_blocks_minted_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetLatestEpochBlocksMintedByPool { + vrf_key_hash: pool_info.vrf_key_hash.clone(), + }, + ))); + let epoch_blocks_minted_f = query_state( + &context, + &handlers_config.epochs_query_topic, + epoch_blocks_minted_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::LatestEpochBlocksMintedByPool(blocks_minted), + )) => Ok(blocks_minted), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ); + + // query active stakes info from spo_state + let active_stakes_info_msg = Arc::new(Message::StateQuery(StateQuery::Pools( + PoolsStateQuery::GetPoolActiveStakeInfo { + pool_operator: pool_operator.clone(), + epoch: latest_epoch, + }, + ))); + let active_stakes_info_f = query_state( + &context, + &handlers_config.pools_query_topic, + active_stakes_info_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::PoolActiveStakeInfo(res), + )) => Ok(Some(res)), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::Error(_e), + )) => Ok(None), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ); + + // Get live_pledge + // Query owner accounts balance sum from accounts_state + let live_pledge_msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetAccountsUtxoValuesSum { + stake_keys: pool_info.pool_owners.clone(), + }, + ))); + + let live_pledge_f = query_state( + &context, + &handlers_config.accounts_query_topic, + live_pledge_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::AccountsUtxoValuesSum(res), + )) => Ok(res), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while retrieving live pledge: {e}" + )), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ); + + let (epoch_blocks_minted, active_stakes_info, live_pledge) = + join!(epoch_blocks_minted_f, active_stakes_info_f, live_pledge_f,); + let epoch_blocks_minted = epoch_blocks_minted?; + let active_stakes_info = active_stakes_info?; + let live_pledge = live_pledge?; + + let pool_id = pool_info.operator.to_bech32_with_hrp("pool").unwrap(); + let reward_account = pool_info.reward_account.to_bech32_with_hrp("stake"); + let Ok(reward_account) = reward_account else { + return Ok(RESTResponse::with_text(404, "Invalid Reward Account")); + }; + let pool_owners = pool_info + .pool_owners + .iter() + .map(|owner| owner.to_bech32_with_hrp("stake")) + .collect::, _>>(); + let Ok(pool_owners) = pool_owners else { + return Ok(RESTResponse::with_text(404, "Invalid Pool Owners")); + }; + let pool_info_rest: PoolInfoRest = PoolInfoRest { + pool_id, + hex: pool_info.operator, + vrf_key: pool_info.vrf_key_hash, + blocks_minted: total_blocks_minted, + blocks_epoch: epoch_blocks_minted, + live_stake: live_stakes_info.live_stake, + live_size: Decimal::from(live_stakes_info.live_stake) + / Decimal::from(live_stakes_info.total_live_stakes), + live_saturation: Decimal::from(live_stakes_info.live_stake) + * Decimal::from(optimal_pool_sizing.nopt) + / Decimal::from(optimal_pool_sizing.total_supply), + live_delegators: live_stakes_info.live_delegators, + active_stake: active_stakes_info.as_ref().map(|info| info.active_stake), + active_size: active_stakes_info + .as_ref() + .map(|info| info.active_size.to_checked_f64("active_size").unwrap_or(0.0)), + declared_pledge: pool_info.pledge, + live_pledge: live_pledge, + margin_cost: pool_info.margin.to_f32(), + fixed_cost: pool_info.cost, + reward_account, + pool_owners, + registration: registrations, + retirement: retirements, + }; + + match serde_json::to_string(&pool_info_rest) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Internal server error while retrieving pool info: {e}"), + )), + } } pub async fn handle_pool_history_blockfrost( @@ -752,25 +1047,168 @@ pub async fn handle_pool_delegators_blockfrost( } pub async fn handle_pool_blocks_blockfrost( - _context: Arc>, - _params: Vec, - _handlers_config: Arc, + context: Arc>, + params: Vec, + handlers_config: Arc, ) -> Result { - Ok(RESTResponse::with_text(501, "Not implemented")) + let Some(pool_id) = params.get(0) else { + return Ok(RESTResponse::with_text(400, "Missing pool ID parameter")); + }; + + let Ok(spo) = Vec::::from_bech32_with_hrp(pool_id, "pool") else { + return Ok(RESTResponse::with_text( + 400, + &format!("Invalid Bech32 stake pool ID: {pool_id}"), + )); + }; + + // Get block hashes by pool_id from spo_state + let pool_blocks_msg = Arc::new(Message::StateQuery(StateQuery::Pools( + PoolsStateQuery::GetPoolBlockHashes { + pool_id: spo.clone(), + }, + ))); + + let pool_blocks = query_state( + &context, + &handlers_config.pools_query_topic, + pool_blocks_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::PoolBlockHashes(pool_blocks), + )) => Ok(pool_blocks), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::Error(_), + )) => Err(anyhow::anyhow!("Block hashes are not enabled")), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ) + .await?; + + let pool_blocks_rest = pool_blocks.into_iter().map(|b| hex::encode(b)).collect::>(); + match serde_json::to_string(&pool_blocks_rest) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Internal server error while retrieving pool blocks: {e}"), + )), + } } pub async fn handle_pool_updates_blockfrost( - _context: Arc>, - _params: Vec, - _handlers_config: Arc, + context: Arc>, + params: Vec, + handlers_config: Arc, ) -> Result { - Ok(RESTResponse::with_text(501, "Not implemented")) + let Some(pool_id) = params.get(0) else { + return Ok(RESTResponse::with_text(400, "Missing pool ID parameter")); + }; + + let Ok(spo) = Vec::::from_bech32_with_hrp(pool_id, "pool") else { + return Ok(RESTResponse::with_text( + 400, + &format!("Invalid Bech32 stake pool ID: {pool_id}"), + )); + }; + + // query from spo_state + let pool_updates_msg = Arc::new(Message::StateQuery(StateQuery::Pools( + PoolsStateQuery::GetPoolUpdates { + pool_id: spo.clone(), + }, + ))); + let pool_updates = query_state( + &context, + &handlers_config.pools_query_topic, + pool_updates_msg, + |message: Message| match message { + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::PoolUpdates(pool_updates), + )) => Ok(pool_updates), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::NotFound, + )) => Err(anyhow::anyhow!("Pool not found")), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while retrieving pool updates: {e}" + )), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ) + .await?; + let pool_updates_rest = pool_updates + .into_iter() + .map(|u| PoolUpdateEventRest { + tx_hash: u.tx_hash, + cert_index: u.cert_index, + action: u.action, + }) + .collect::>(); + + match serde_json::to_string(&pool_updates_rest) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Internal server error while retrieving pool updates: {e}"), + )), + } } pub async fn handle_pool_votes_blockfrost( - _context: Arc>, - _params: Vec, - _handlers_config: Arc, + context: Arc>, + params: Vec, + handlers_config: Arc, ) -> Result { - Ok(RESTResponse::with_text(501, "Not implemented")) + let Some(pool_id) = params.get(0) else { + return Ok(RESTResponse::with_text(400, "Missing pool ID parameter")); + }; + + let Ok(spo) = Vec::::from_bech32_with_hrp(pool_id, "pool") else { + return Ok(RESTResponse::with_text( + 400, + &format!("Invalid Bech32 stake pool ID: {pool_id}"), + )); + }; + + // query from spo_state + let pool_votes_msg = Arc::new(Message::StateQuery(StateQuery::Pools( + PoolsStateQuery::GetPoolVotes { + pool_id: spo.clone(), + }, + ))); + let pool_votes = query_state( + &context, + &handlers_config.pools_query_topic, + pool_votes_msg, + |message: Message| match message { + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::PoolVotes(pool_votes), + )) => Ok(pool_votes), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while retrieving pool votes: {e}" + )), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ) + .await?; + + let pool_votes_rest = pool_votes + .into_iter() + .map(|v| PoolVoteRest { + tx_hash: v.tx_hash, + vote_index: v.vote_index, + vote: v.vote, + }) + .collect::>(); + + match serde_json::to_string(&pool_votes_rest) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Internal server error while retrieving pool votes: {e}"), + )), + } } diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 1870ba83..ef360289 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -7,7 +7,7 @@ use acropolis_common::{ governance::DRepActionUpdate, }, rest_helper::ToCheckedF64, - PoolEpochState, Relay, Vote, + KeyHash, PoolEpochState, PoolUpdateAction, Relay, TxHash, Vote, }; use num_traits::ToPrimitive; use rust_decimal::Decimal; @@ -370,6 +370,60 @@ impl From for PoolRelayRest { } } +// REST response structure for /pools/{pool_id}/updates +#[serde_as] +#[derive(Serialize)] +pub struct PoolUpdateEventRest { + #[serde_as(as = "Hex")] + pub tx_hash: TxHash, + pub cert_index: u64, + pub action: PoolUpdateAction, +} + +// REST response structure for /pools/{pool_id}/votes +#[serde_as] +#[derive(Serialize)] +pub struct PoolVoteRest { + #[serde_as(as = "Hex")] + pub tx_hash: TxHash, + pub vote_index: u32, + pub vote: Vote, +} + +// REST response structure for /pools/{pool_id} +#[serde_as] +#[derive(Serialize)] +pub struct PoolInfoRest { + pub pool_id: String, + #[serde_as(as = "Hex")] + pub hex: KeyHash, + #[serde_as(as = "Hex")] + pub vrf_key: KeyHash, + pub blocks_minted: u64, + pub blocks_epoch: u64, + #[serde_as(as = "DisplayFromStr")] + pub live_stake: u64, + pub live_size: Decimal, + pub live_saturation: Decimal, + pub live_delegators: u64, + #[serde_as(as = "Option")] + pub active_stake: Option, + pub active_size: Option, + #[serde_as(as = "DisplayFromStr")] + pub declared_pledge: u64, + #[serde_as(as = "DisplayFromStr")] + pub live_pledge: u64, + pub margin_cost: f32, + #[serde_as(as = "DisplayFromStr")] + pub fixed_cost: u64, + pub reward_account: String, + pub pool_owners: Vec, + #[serde_as(as = "Option>")] + pub registration: Option>, + #[serde_as(as = "Option>")] + pub retirement: Option>, +} + // REST response structure for protocol params #[derive(Serialize)] pub struct ProtocolParamsRest { diff --git a/modules/spo_state/Cargo.toml b/modules/spo_state/Cargo.toml index 997f0d35..7ba53b75 100644 --- a/modules/spo_state/Cargo.toml +++ b/modules/spo_state/Cargo.toml @@ -22,6 +22,7 @@ hex = "0.4.3" imbl = { version = "5.0.0", features = ["serde"] } dashmap = "6.1.0" rayon = "1.11.0" +pallas = "0.33.0" [lib] path = "src/spo_state.rs" diff --git a/modules/spo_state/src/spo_state.rs b/modules/spo_state/src/spo_state.rs index 2bbb2fc1..757f3c30 100644 --- a/modules/spo_state/src/spo_state.rs +++ b/modules/spo_state/src/spo_state.rs @@ -13,11 +13,12 @@ use acropolis_common::{ }, rational_number::RationalNumber, state_history::{StateHistory, StateHistoryStore}, - BlockInfo, BlockStatus, + BlockInfo, BlockStatus, Era, }; use anyhow::Result; use caryatid_sdk::{module, Context, Module, Subscription}; use config::Config; +use pallas::ledger::traverse::MultiEraHeader; use std::sync::Arc; use tokio::sync::Mutex; use tracing::{error, info, info_span, Instrument}; @@ -45,6 +46,8 @@ const DEFAULT_WITHDRAWALS_SUBSCRIBE_TOPIC: (&str, &str) = ("withdrawals-subscribe-topic", "cardano.withdrawals"); const DEFAULT_GOVERNANCE_SUBSCRIBE_TOPIC: (&str, &str) = ("governance-subscribe-topic", "cardano.governance"); +const DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC: (&str, &str) = + ("block-header-subscribe-topic", "cardano.block.header"); const DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC: (&str, &str) = ("epoch-activity-subscribe-topic", "cardano.epoch.activity"); const DEFAULT_SPDD_SUBSCRIBE_TOPIC: (&str, &str) = @@ -82,6 +85,7 @@ impl SPOState { store_config: &StoreConfig, // subscribers mut certificates_subscription: Box>, + mut block_headers_subscription: Box>, mut withdrawals_subscription: Option>>, mut governance_subscription: Option>>, mut epoch_activity_subscription: Box>, @@ -110,6 +114,7 @@ impl SPOState { // read per-block topics in parallel let certs_message_f = certificates_subscription.read(); + let block_headers_message_f = block_headers_subscription.read(); let withdrawals_message_f = withdrawals_subscription.as_mut().map(|s| s.read()); let governance_message_f = governance_subscription.as_mut().map(|s| s.read()); let stake_deltas_message_f = stake_deltas_subscription.as_mut().map(|s| s.read()); @@ -117,13 +122,63 @@ impl SPOState { // Use certs_message as the synchroniser let (_, certs_message) = certs_message_f.await?; let new_epoch = match certs_message.as_ref() { - Message::Cardano((block_info, CardanoMessage::TxCertificates(tx_certs_msg))) => { + Message::Cardano((block_info, _)) => { // Handle rollbacks on this topic only if block_info.status == BlockStatus::RolledBack { state = history.lock().await.get_rolled_back_state(block_info.number); } current_block = Some(block_info.clone()); + // new_epoch? + block_info.new_epoch && block_info.epoch > 0 + } + + _ => { + error!("Unexpected message type: {certs_message:?}"); + false + } + }; + + // handle Block Headers (handle_mint) before handle_tx_certs + // in case of epoch boundary + let (_, block_headers_message) = block_headers_message_f.await?; + match block_headers_message.as_ref() { + Message::Cardano((block_info, CardanoMessage::BlockHeader(header_msg))) => { + let span = + info_span!("spo_state.handle_block_header", block = block_info.number); + + span.in_scope(|| { + // Derive the variant from the era - just enough to make + // MultiEraHeader::decode() work. + let variant = match block_info.era { + Era::Byron => 0, + Era::Shelley => 1, + Era::Allegra => 2, + Era::Mary => 3, + Era::Alonzo => 4, + _ => 5, + }; + + // Parse the header - note we ignore the subtag because EBBs + // are suppressed upstream + match MultiEraHeader::decode(variant, None, &header_msg.raw) { + Ok(header) => { + if let Some(vrf_vkey) = header.vrf_vkey() { + state.handle_mint(&block_info, vrf_vkey); + } + } + + Err(e) => error!("Can't decode header {}: {e}", block_info.slot), + } + }); + } + + _ => error!("Unexpected message type: {block_headers_message:?}"), + } + + // handle tx certificates + match certs_message.as_ref() { + Message::Cardano((block_info, CardanoMessage::TxCertificates(tx_certs_msg))) => { let span = info_span!("spo_state.handle_certs", block = block_info.number); async { Self::check_sync(¤t_block, &block_info); @@ -150,15 +205,9 @@ impl SPOState { } .instrument(span) .await; - - // new_epoch? - block_info.new_epoch && block_info.epoch > 0 } - _ => { - error!("Unexpected message type: {certs_message:?}"); - false - } + _ => error!("Unexpected message type: {certs_message:?}"), }; // read from epoch-boundary messages only when it's a new epoch @@ -388,6 +437,11 @@ impl SPOState { .unwrap_or(DEFAULT_SPO_REWARDS_SUBSCRIBE_TOPIC.1.to_string()); info!("Creating SPO rewards subscriber on '{spo_rewards_subscribe_topic}'"); + let block_headers_subscribe_topic = config + .get_string(DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating block headers subscriber on '{block_headers_subscribe_topic}'"); + let stake_reward_deltas_subscribe_topic = config .get_string(DEFAULT_STAKE_REWARD_DELTAS_SUBSCRIBE_TOPIC.0) .unwrap_or(DEFAULT_STAKE_REWARD_DELTAS_SUBSCRIBE_TOPIC.1.to_string()); @@ -451,6 +505,17 @@ impl SPOState { let state = history.lock().await.get_current_state(); let response = match query { + // NOTE: + // For now, we only store active pools + // But we need to store retired pool's information also + // for BF's compatibility + PoolsStateQuery::GetPoolInfo { pool_id } => { + match state.get(pool_id) { + Some(pool) => PoolsStateQueryResponse::PoolInfo(pool.clone()), + None => PoolsStateQueryResponse::NotFound, + } + } + PoolsStateQuery::GetPoolsList => { PoolsStateQueryResponse::PoolsList(state.list_pool_operators()) } @@ -498,6 +563,12 @@ impl SPOState { } } + PoolsStateQuery::GetPoolsTotalBlocksMinted { + pools_operators + } => { + PoolsStateQueryResponse::PoolsTotalBlocksMinted(state.get_total_blocks_minted_by_pools(pools_operators)) + } + PoolsStateQuery::GetPoolHistory { pool_id } => { if epochs_history.is_enabled() { let history = @@ -527,10 +598,6 @@ impl SPOState { } PoolsStateQuery::GetPoolMetadata { pool_id } => { - // NOTE: - // we need to check retired pools metadata - // to do so, we need to save retired pool's registration - // let pool_metadata = state.get_pool_metadata(pool_id); if let Some(pool_metadata) = pool_metadata { PoolsStateQueryResponse::PoolMetadata(pool_metadata) @@ -563,28 +630,38 @@ impl SPOState { } } + PoolsStateQuery::GetPoolTotalBlocksMinted { pool_id } => { + PoolsStateQueryResponse::PoolTotalBlocksMinted(state.get_total_blocks_minted_by_pool(&pool_id)) + } + + PoolsStateQuery::GetPoolBlockHashes { pool_id } => { + if state.is_block_hashes_enabled() { + PoolsStateQueryResponse::PoolBlockHashes(state.get_pool_block_hashes(pool_id).unwrap_or_default()) + } else { + PoolsStateQueryResponse::Error("Block hashes are not enabled".into()) + } + } + PoolsStateQuery::GetPoolUpdates { pool_id } => { - let pool_updates = state.get_pool_updates(pool_id); - if let Some(pool_updates) = pool_updates { - PoolsStateQueryResponse::PoolUpdates(pool_updates) + if state.is_historical_updates_enabled() { + let pool_updates = state.get_pool_updates(pool_id); + if let Some(pool_updates) = pool_updates { + PoolsStateQueryResponse::PoolUpdates(pool_updates) + } else { + PoolsStateQueryResponse::NotFound + } } else { PoolsStateQueryResponse::Error("Pool updates are not enabled".into()) } } PoolsStateQuery::GetPoolVotes { pool_id } => { - let pool_votes = state.get_pool_votes(pool_id); - if let Some(pool_votes) = pool_votes { - PoolsStateQueryResponse::PoolVotes(pool_votes) + if state.is_historical_votes_enabled() { + PoolsStateQueryResponse::PoolVotes(state.get_pool_votes(pool_id).unwrap_or_default()) } else { - PoolsStateQueryResponse::Error("Pool Votes are not enabled".into()) + PoolsStateQueryResponse::Error("Pool votes are not enabled".into()) } } - - _ => PoolsStateQueryResponse::Error(format!( - "Unimplemented query variant: {:?}", - query - )) }; Arc::new(Message::StateQueryResponse(StateQueryResponse::Pools( @@ -639,6 +716,7 @@ impl SPOState { // Subscriptions let certificates_subscription = context.subscribe(&certificates_subscribe_topic).await?; + let block_headers_subscription = context.subscribe(&block_headers_subscribe_topic).await?; let epoch_activity_subscription = context.subscribe(&epoch_activity_subscribe_topic).await?; let spdd_subscription = context.subscribe(&spdd_subscribe_topic).await?; @@ -684,6 +762,7 @@ impl SPOState { retired_pools_history, &store_config, certificates_subscription, + block_headers_subscription, withdrawals_subscription, governance_subscription, epoch_activity_subscription, diff --git a/modules/spo_state/src/state.rs b/modules/spo_state/src/state.rs index ed63cdf2..976faf15 100644 --- a/modules/spo_state/src/state.rs +++ b/modules/spo_state/src/state.rs @@ -1,6 +1,7 @@ //! Acropolis SPOState: State storage use acropolis_common::{ + crypto::keyhash, ledger_state::SPOState, messages::{ CardanoMessage, Message, SPOStateMessage, StakeAddressDeltasMessage, @@ -9,12 +10,12 @@ use acropolis_common::{ params::TECHNICAL_PARAMETER_POOL_RETIRE_MAX_EPOCH, queries::governance::VoteRecord, stake_addresses::StakeAddressMap, - BlockInfo, KeyHash, PoolMetadata, PoolRegistration, PoolRegistrationWithPos, PoolRetirement, - PoolRetirementWithPos, PoolUpdateEvent, Relay, StakeCredential, TxCertificate, TxHash, Voter, - VotingProcedures, + BlockHash, BlockInfo, KeyHash, PoolMetadata, PoolRegistration, PoolRegistrationWithPos, + PoolRetirement, PoolRetirementWithPos, PoolUpdateEvent, Relay, StakeCredential, TxCertificate, + TxHash, Voter, VotingProcedures, }; use anyhow::Result; -use imbl::HashMap; +use imbl::{HashMap, Vector}; use std::sync::{Arc, Mutex}; use tracing::{debug, error, info}; @@ -34,7 +35,14 @@ pub struct State { pending_deregistrations: HashMap>>, /// vrf_key_hash -> pool_id mapping - vrf_key_to_pool_id_map: HashMap, Vec>, + vrf_key_hash_to_pool_id_map: HashMap, Vec>, + + // Total blocks minted till block number + // Keyed by pool_id + total_blocks_minted: HashMap, + + /// block hashes keyed pool id + block_hashes: Option>>, /// historical spo state /// keyed by pool operator id @@ -52,12 +60,18 @@ impl State { epoch: 0, spos: HashMap::new(), pending_deregistrations: HashMap::new(), - vrf_key_to_pool_id_map: HashMap::new(), + vrf_key_hash_to_pool_id_map: HashMap::new(), + total_blocks_minted: HashMap::new(), historical_spos: if config.store_historical_state() { Some(HashMap::new()) } else { None }, + block_hashes: if config.store_block_hashes { + Some(HashMap::new()) + } else { + None + }, stake_addresses: if config.store_stake_addresses { Some(Arc::new(Mutex::new(StakeAddressMap::new()))) } else { @@ -82,6 +96,10 @@ impl State { self.store_config.store_votes } + pub fn is_block_hashes_enabled(&self) -> bool { + self.store_config.store_block_hashes + } + pub fn is_stake_address_enabled(&self) -> bool { self.store_config.store_stake_addresses } @@ -90,7 +108,7 @@ impl State { impl From for State { fn from(value: SPOState) -> Self { let spos: HashMap = value.pools.into(); - let vrf_key_to_pool_id_map = + let vrf_key_hash_to_pool_id_map = spos.iter().map(|(k, v)| (v.vrf_key_hash.clone(), k.clone())).collect(); let pending_deregistrations = value.retiring.into_iter().fold(HashMap::new(), |mut acc, (key_hash, epoch)| { @@ -103,8 +121,10 @@ impl From for State { epoch: 0, spos, pending_deregistrations, - vrf_key_to_pool_id_map, + vrf_key_hash_to_pool_id_map, + total_blocks_minted: HashMap::new(), historical_spos: None, + block_hashes: None, stake_addresses: None, } } @@ -135,12 +155,20 @@ impl State { self.spos.get(pool_id) } - /// Get SPO from vrf_key_hash - pub fn get_pool_id_from_vrf_key_hash(&self, vrf_key_hash: &KeyHash) -> Option { - self.vrf_key_to_pool_id_map.get(vrf_key_hash).cloned() + /// Get total blocks minted by pools + pub fn get_total_blocks_minted_by_pools(&self, pools_operators: &Vec) -> Vec { + pools_operators + .iter() + .map(|pool_operator| *self.total_blocks_minted.get(pool_operator).unwrap_or(&0)) + .collect() } - /// Get vrf_key_to_pool_id_map + /// Get total blocks minted by pool + pub fn get_total_blocks_minted_by_pool(&self, pool_operator: &KeyHash) -> u64 { + *self.total_blocks_minted.get(pool_operator).unwrap_or(&0) + } + + /// Get (SPO, u64) from (VRF, u64) Map pub fn get_blocks_minted_by_spos( &self, vrf_key_hashes: &Vec<(KeyHash, usize)>, @@ -148,7 +176,7 @@ impl State { vrf_key_hashes .iter() .filter_map(|(vrf_key_hash, amount)| { - self.vrf_key_to_pool_id_map.get(vrf_key_hash).map(|spo| (spo.clone(), *amount)) + self.vrf_key_hash_to_pool_id_map.get(vrf_key_hash).map(|spo| (spo.clone(), *amount)) }) .collect() } @@ -191,6 +219,11 @@ impl State { delegators_map.map(|map| map.into_iter().collect()) } + /// Get Pool Blocks + pub fn get_pool_block_hashes(&self, pool_id: &KeyHash) -> Option> { + self.block_hashes.as_ref()?.get(pool_id).map(|blocks| blocks.clone().into_iter().collect()) + } + /// Get Pool Updates pub fn get_pool_updates(&self, pool_id: &KeyHash) -> Option> { self.historical_spos.as_ref()?.get(pool_id).and_then(|s| s.updates.clone()) @@ -234,6 +267,23 @@ impl State { Ok(()) } + // Handle block's minting. + // Returns None if block_hashes is not enabled + // Return Some(false) if pool_id for vrf_vkey is not found + pub fn handle_mint(&mut self, block_info: &BlockInfo, vrf_vkey: &[u8]) -> Option { + let vrf_key_hash = keyhash(vrf_vkey); + let Some(pool_id) = self.vrf_key_hash_to_pool_id_map.get(&vrf_key_hash).cloned() else { + return Some(false); + }; + + *(self.total_blocks_minted.entry(pool_id.clone()).or_insert(0)) += 1; + // if block_hashes are enabled + if let Some(block_hashes) = self.block_hashes.as_mut() { + block_hashes.entry(pool_id).or_insert_with(Vector::new).push_back(block_info.hash); + }; + Some(true) + } + fn handle_new_epoch(&mut self, block: &BlockInfo) -> Arc { self.epoch = block.epoch; debug!(epoch = self.epoch, "New epoch"); @@ -253,9 +303,9 @@ impl State { "Retirement requested for unregistered SPO {}", hex::encode(&dr), ), - Some(de_reg) => { + Some(_de_reg) => { retired_spos.push(dr.clone()); - self.vrf_key_to_pool_id_map.remove(&de_reg.vrf_key_hash); + // self.vrf_key_hash_to_pool_id_map.remove(&de_reg.vrf_key_hash); } }; } @@ -287,7 +337,7 @@ impl State { hex::encode(®.operator) ); self.spos.insert(reg.operator.clone(), reg.clone()); - self.vrf_key_to_pool_id_map.insert(reg.vrf_key_hash.clone(), reg.operator.clone()); + self.vrf_key_hash_to_pool_id_map.insert(reg.vrf_key_hash.clone(), reg.operator.clone()); // Remove any existing queued deregistrations for (epoch, deregistrations) in &mut self.pending_deregistrations.iter_mut() { @@ -626,9 +676,9 @@ mod tests { } #[test] - fn vrf_key_to_pool_id_map_is_none_on_empty_state() { + fn vrf_key_hash_to_pool_id_map_is_none_on_empty_state() { let state = State::default(); - assert!(state.vrf_key_to_pool_id_map.is_empty()); + assert!(state.vrf_key_hash_to_pool_id_map.is_empty()); } #[test] @@ -959,4 +1009,93 @@ mod tests { assert_eq!(vec![1], retiring_pools[1].operator); assert_eq!(3, retiring_pools[1].epoch); } + + #[test] + fn get_total_blocks_minted_returns_zeros_when_state_is_new() { + let state = State::default(); + assert_eq!(0, state.get_total_blocks_minted_by_pools(&vec![vec![0]])[0]); + assert_eq!(0, state.get_total_blocks_minted_by_pool(&vec![0])); + } + + #[test] + fn get_total_blocks_minted_returns_after_handle_mint() { + let mut state = State::new(&save_block_hashes_store_config()); + let mut block = new_block(0); + let mut msg = new_certs_msg(); + msg.certificates.push(TxCertificate::PoolRegistrationWithPos( + PoolRegistrationWithPos { + reg: PoolRegistration { + operator: vec![1], + vrf_key_hash: keyhash(&vec![0]), + pledge: 0, + cost: 0, + margin: Ratio { + numerator: 0, + denominator: 0, + }, + reward_account: vec![0], + pool_owners: vec![vec![0]], + relays: vec![], + pool_metadata: None, + }, + tx_hash: TxHash::default(), + cert_index: 0, + }, + )); + assert!(state.handle_tx_certs(&block, &msg).is_ok()); + + block = new_block(2); + assert_eq!(Some(true), state.handle_mint(&block, &vec![0])); + assert_eq!(1, state.get_total_blocks_minted_by_pool(&vec![1])); + + block = new_block(3); + assert_eq!(Some(true), state.handle_mint(&block, &vec![0])); + assert_eq!(2, state.get_total_blocks_minted_by_pools(&vec![vec![1]])[0]); + } + + #[test] + fn get_block_hashes_returns_none_when_state_is_new() { + let state = State::default(); + assert!(state.get_pool_block_hashes(&vec![0]).is_none()); + } + + #[test] + fn handle_mint_returns_false_if_pool_not_found() { + let mut state = State::new(&save_block_hashes_store_config()); + let block = new_block(0); + assert_eq!(Some(false), state.handle_mint(&block, &vec![0])); + } + + #[test] + fn get_block_hashes_return_data_after_handle_mint() { + let mut state = State::new(&save_block_hashes_store_config()); + let mut block = new_block(0); + let mut msg = new_certs_msg(); + msg.certificates.push(TxCertificate::PoolRegistrationWithPos( + PoolRegistrationWithPos { + reg: PoolRegistration { + operator: vec![1], + vrf_key_hash: keyhash(&vec![0]), + pledge: 0, + cost: 0, + margin: Ratio { + numerator: 0, + denominator: 0, + }, + reward_account: vec![0], + pool_owners: vec![vec![0]], + relays: vec![], + pool_metadata: None, + }, + tx_hash: TxHash::default(), + cert_index: 0, + }, + )); + assert!(state.handle_tx_certs(&block, &msg).is_ok()); + block = new_block(2); + assert_eq!(Some(true), state.handle_mint(&block, &vec![0])); + let block_hashes = state.get_pool_block_hashes(&vec![1]).unwrap(); + assert_eq!(block_hashes.len(), 1); + assert_eq!(block_hashes[0], block.hash); + } } diff --git a/modules/spo_state/src/store_config.rs b/modules/spo_state/src/store_config.rs index eb53a4f6..97a294a5 100644 --- a/modules/spo_state/src/store_config.rs +++ b/modules/spo_state/src/store_config.rs @@ -9,6 +9,7 @@ const DEFAULT_STORE_REGISTRATION: (&str, bool) = ("store-registration", false); const DEFAULT_STORE_UPDATES: (&str, bool) = ("store-updates", false); const DEFAULT_STORE_DELEGATORS: (&str, bool) = ("store-delegators", false); const DEFAULT_STORE_VOTES: (&str, bool) = ("store-votes", false); +const DEFAULT_STORE_BLOCK_HASHES: (&str, bool) = ("store-block-hashes", false); const DEFAULT_STORE_STAKE_ADDRESSES: (&str, bool) = ("store-stake-addresses", false); #[derive(Default, Debug, Clone, Serialize)] @@ -19,6 +20,7 @@ pub struct StoreConfig { pub store_updates: bool, pub store_delegators: bool, pub store_votes: bool, + pub store_block_hashes: bool, pub store_stake_addresses: bool, } @@ -30,6 +32,7 @@ impl StoreConfig { store_updates: bool, store_delegators: bool, store_votes: bool, + store_block_hashes: bool, store_stake_addresses: bool, ) -> Self { Self { @@ -39,6 +42,7 @@ impl StoreConfig { store_updates, store_delegators, store_votes, + store_block_hashes, store_stake_addresses, } } @@ -67,6 +71,9 @@ impl From> for StoreConfig { .get_bool(DEFAULT_STORE_DELEGATORS.0) .unwrap_or(DEFAULT_STORE_DELEGATORS.1), store_votes: config.get_bool(DEFAULT_STORE_VOTES.0).unwrap_or(DEFAULT_STORE_VOTES.1), + store_block_hashes: config + .get_bool(DEFAULT_STORE_BLOCK_HASHES.0) + .unwrap_or(DEFAULT_STORE_BLOCK_HASHES.1), store_stake_addresses: config .get_bool(DEFAULT_STORE_STAKE_ADDRESSES.0) .unwrap_or(DEFAULT_STORE_STAKE_ADDRESSES.1), diff --git a/modules/spo_state/src/test_utils.rs b/modules/spo_state/src/test_utils.rs index fe9891b3..3fc35c11 100644 --- a/modules/spo_state/src/test_utils.rs +++ b/modules/spo_state/src/test_utils.rs @@ -15,6 +15,7 @@ pub fn default_store_config() -> StoreConfig { store_updates: false, store_delegators: false, store_votes: false, + store_block_hashes: false, store_stake_addresses: false, } } @@ -27,6 +28,7 @@ pub fn save_history_store_config() -> StoreConfig { store_updates: false, store_delegators: false, store_votes: false, + store_block_hashes: false, store_stake_addresses: false, } } @@ -39,6 +41,20 @@ pub fn save_retired_pools_store_config() -> StoreConfig { store_updates: false, store_delegators: false, store_votes: false, + store_block_hashes: false, + store_stake_addresses: false, + } +} + +pub fn save_block_hashes_store_config() -> StoreConfig { + StoreConfig { + store_epochs_history: false, + store_retired_pools: false, + store_registration: false, + store_updates: false, + store_delegators: false, + store_votes: false, + store_block_hashes: true, store_stake_addresses: false, } } diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 049c2868..c896806f 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -44,6 +44,8 @@ store-updates = false store-delegators = false # Enables /pools/{pool_id}/votes endpoint store-votes = false +# Store block hashes, enables /pools/{pool_id}/blocks endpoints +store-block-hashes = false # Store stake_addresses store-stake-addresses = false @@ -77,8 +79,6 @@ write-full-cache = "false" [module.epochs-state] # Enables /epochs/{number} endpoint (for historical epochs) store-history = false -# Enables /pools/{pool_id}/blocks endpoint -store-block-hashes = false [module.accounts-state]