diff --git a/Cargo.lock b/Cargo.lock index e1d2257e..d6ed74db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -113,7 +113,7 @@ dependencies = [ ] [[package]] -name = "acropolis_module_epoch_activity_counter" +name = "acropolis_module_epochs_state" version = "0.1.0" dependencies = [ "acropolis_common", @@ -359,7 +359,7 @@ dependencies = [ "acropolis_module_block_unpacker", "acropolis_module_drdd_state", "acropolis_module_drep_state", - "acropolis_module_epoch_activity_counter", + "acropolis_module_epochs_state", "acropolis_module_genesis_bootstrapper", "acropolis_module_governance_state", "acropolis_module_mithril_snapshot_fetcher", @@ -400,7 +400,7 @@ dependencies = [ "acropolis_module_accounts_state", "acropolis_module_block_unpacker", "acropolis_module_drep_state", - "acropolis_module_epoch_activity_counter", + "acropolis_module_epochs_state", "acropolis_module_genesis_bootstrapper", "acropolis_module_governance_state", "acropolis_module_mithril_snapshot_fetcher", @@ -434,7 +434,7 @@ dependencies = [ "acropolis_module_accounts_state", "acropolis_module_block_unpacker", "acropolis_module_drep_state", - "acropolis_module_epoch_activity_counter", + "acropolis_module_epochs_state", "acropolis_module_governance_state", "acropolis_module_mithril_snapshot_fetcher", "acropolis_module_parameters_state", diff --git a/Cargo.toml b/Cargo.toml index b7cf16d7..9d8bf372 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ members = [ "modules/parameters_state", # Keeps track of protocol parameters "modules/governance_state", # Governance state "modules/stake_delta_filter", # Filters address deltas - "modules/epoch_activity_counter", # Counts fees and block producers for rewards + "modules/epochs_state", # Tracks fees and blocks minted and epochs history "modules/accounts_state", # Tracks stake and reward accounts "modules/assets_state", # Tracks native asset mints and burns diff --git a/README.md b/README.md index 2220cc5b..c1a1348e 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ structure is highly subject to change: * [DRep State](modules/drep_state) - tracks DRep registrations * [Governance State](modules/governance_state) - tracks Governance Actions and voting * [Stake Delta Filter](modules/stake_delta_filter) - filters out stake address changes and handles stake pointer references -* [Epoch Activity Counter](modules/epoch_activity_counter) - counts fees and block production for rewards +* [Epochs State](modules/epochs_state) - track fees blocks minted and epochs history * [Accounts State](modules/accounts_state) - stake and reward accounts tracker ```mermaid @@ -74,7 +74,7 @@ graph LR DRepState(DRep State) GovernanceState(Governance State) StakeDeltaFilter(Stake Delta Filter) - EpochActivityCounter(Epoch Activity Counter) + EpochsState(EpochsState) AccountsState(Accounts State) UpstreamChainFetcher --> BlockUnpacker @@ -82,16 +82,16 @@ graph LR BlockUnpacker --> TxUnpacker GenesisBootstrapper --> UTXOState TxUnpacker --> UTXOState - TxUnpacker --> EpochActivityCounter + TxUnpacker --> EpochsState TxUnpacker --> AccountsState TxUnpacker --> SPOState TxUnpacker --> DRepState TxUnpacker --> GovernanceState UTXOState --> StakeDeltaFilter StakeDeltaFilter --> AccountsState - UpstreamChainFetcher --> EpochActivityCounter - MithrilSnapshotFetcher --> EpochActivityCounter - EpochActivityCounter --> AccountsState + UpstreamChainFetcher --> EpochsState + MithrilSnapshotFetcher --> EpochsState + EpochsState --> AccountsState SPOState --> AccountsState DRepState --> GovernanceState GovernanceState --> AccountsState diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index 5c543c49..2d4213d9 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -18,12 +18,16 @@ pub enum AccountsStateQuery { GetAccountAssets { stake_key: Vec }, GetAccountAssetsTotals { stake_key: Vec }, GetAccountUTxOs { stake_key: Vec }, + GetAccountsUtxoValuesMap { stake_keys: Vec> }, + GetAccountsUtxoValuesSum { stake_keys: Vec> }, GetAccountsBalancesMap { stake_keys: Vec> }, GetAccountsBalancesSum { stake_keys: Vec> }, // Pools related queries + GetOptimalPoolSizing, GetPoolsLiveStakes { pools_operators: Vec> }, GetPoolDelegators { pool_operator: KeyHash }, + GetPoolLiveStakeInfo { pool_operator: KeyHash }, // Dreps related queries GetAccountsDrepDelegationsMap { stake_keys: Vec> }, @@ -42,12 +46,16 @@ pub enum AccountsStateQueryResponse { AccountAssets(AccountAssets), AccountAssetsTotals(AccountAssetsTotals), AccountUTxOs(AccountUTxOs), + AccountsUtxoValuesMap(HashMap, u64>), + AccountsUtxoValuesSum(u64), AccountsBalancesMap(HashMap, u64>), AccountsBalancesSum(u64), // Pools related responses + OptimalPoolSizing(OptimalPoolSizing), PoolsLiveStakes(PoolsLiveStakes), PoolDelegators(PoolDelegators), + PoolLiveStakeInfo(PoolLiveStakeInfo), // DReps related responses AccountsDrepDelegationsMap(HashMap, Option>), @@ -94,6 +102,12 @@ pub struct AccountAssetsTotals {} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct AccountUTxOs {} +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct OptimalPoolSizing { + pub total_supply: u64, // total_supply - reserves + pub nopt: u64, +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PoolsLiveStakes { // this is in same order of pools_operator from AccountsStateQuery::GetPoolsLiveStakes @@ -104,3 +118,10 @@ pub struct PoolsLiveStakes { pub struct PoolDelegators { pub delegators: Vec<(KeyHash, u64)>, } + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct PoolLiveStakeInfo { + pub live_stake: u64, + pub live_delegators: u64, + pub total_live_stakes: u64, +} diff --git a/common/src/queries/epochs.rs b/common/src/queries/epochs.rs index b5715ed4..c951eb06 100644 --- a/common/src/queries/epochs.rs +++ b/common/src/queries/epochs.rs @@ -1,4 +1,4 @@ -use crate::{messages::EpochActivityMessage, protocol_params::ProtocolParams, KeyHash}; +use crate::{messages::EpochActivityMessage, protocol_params::ProtocolParams, BlockHash, KeyHash}; pub const DEFAULT_EPOCHS_QUERY_TOPIC: (&str, &str) = ("epochs-state-query-topic", "cardano.query.epochs"); @@ -17,6 +17,8 @@ pub enum EpochsStateQuery { // Pools related queries GetBlocksMintedByPools { vrf_key_hashes: Vec }, GetTotalBlocksMintedByPools { vrf_key_hashes: Vec }, + GetBlocksMintedInfoByPool { vrf_key_hash: KeyHash }, + GetBlockHashesByPool { vrf_key_hash: KeyHash }, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -33,6 +35,8 @@ pub enum EpochsStateQueryResponse { // Pools related responses BlocksMintedByPools(BlocksMintedByPools), TotalBlocksMintedByPools(TotalBlocksMintedByPools), + BlocksMintedInfoByPool(BlocksMintedInfoByPool), + BlockHashesByPool(BlockHashesByPool), NotFound, Error(String), @@ -82,3 +86,14 @@ pub struct TotalBlocksMintedByPools { // this is in same order of vrf_key_hashes from EpochsStateQuery::TotalBlocksMinted pub total_blocks_minted: Vec, } + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct BlocksMintedInfoByPool { + pub total_blocks_minted: u64, + pub epoch_blocks_minted: u64, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct BlockHashesByPool { + pub hashes: Vec, +} diff --git a/common/src/queries/governance.rs b/common/src/queries/governance.rs index ed0a6462..c569f41a 100644 --- a/common/src/queries/governance.rs +++ b/common/src/queries/governance.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use serde_with::{hex::Hex, serde_as}; + use crate::{ Anchor, Credential, DRepCredential, GovActionId, Lovelace, ProposalProcedure, TxHash, Vote, Voter, VotingProcedure, @@ -93,8 +95,10 @@ pub struct DRepVotes { pub votes: Vec, } +#[serde_as] #[derive(Clone, serde::Serialize, serde::Deserialize, Debug)] pub struct VoteRecord { + #[serde_as(as = "Hex")] pub tx_hash: TxHash, pub vote_index: u32, pub vote: Vote, diff --git a/common/src/queries/pools.rs b/common/src/queries/pools.rs index 4394e037..7467d8d6 100644 --- a/common/src/queries/pools.rs +++ b/common/src/queries/pools.rs @@ -1,4 +1,7 @@ -use crate::{KeyHash, PoolEpochState, PoolMetadata, PoolRegistration, PoolRetirement, Relay}; +use crate::{ + queries::governance::VoteRecord, rational_number::RationalNumber, KeyHash, PoolEpochState, + PoolMetadata, PoolRegistration, PoolRetirement, PoolUpdateEvent, Relay, +}; pub const DEFAULT_POOLS_QUERY_TOPIC: (&str, &str) = ("pools-state-query-topic", "cardano.query.pools"); @@ -9,6 +12,10 @@ pub enum PoolsStateQuery { GetPoolsListWithInfo, GetPoolsRetiredList, GetPoolsRetiringList, + GetPoolActiveStakeInfo { + pool_operator: KeyHash, + epoch: u64, + }, GetPoolsActiveStakes { pools_operators: Vec>, epoch: u64, @@ -28,9 +35,6 @@ pub enum PoolsStateQuery { GetPoolDelegators { pool_id: Vec, }, - GetPoolBlocks { - pool_id: Vec, - }, GetPoolUpdates { pool_id: Vec, }, @@ -45,13 +49,13 @@ pub enum PoolsStateQueryResponse { PoolsListWithInfo(PoolsListWithInfo), PoolsRetiredList(PoolsRetiredList), PoolsRetiringList(PoolsRetiringList), + PoolActiveStakeInfo(PoolActiveStakeInfo), PoolsActiveStakes(PoolsActiveStakes), - PoolInfo(PoolInfo), + PoolInfo(PoolRegistration), PoolHistory(PoolHistory), PoolMetadata(PoolMetadata), PoolRelays(PoolRelays), PoolDelegators(PoolDelegators), - PoolBlocks(PoolBlocks), PoolUpdates(PoolUpdates), PoolVotes(PoolVotes), NotFound, @@ -79,17 +83,18 @@ pub struct PoolsRetiringList { pub retiring_pools: Vec, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct PoolActiveStakeInfo { + pub active_stake: u64, + pub active_size: RationalNumber, +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PoolsActiveStakes { // this is in same order of pools_operator from PoolsStateQuery::GetPoolsActiveStakes pub active_stakes: Vec, - // this is total active stake for current epoch - pub total_active_stake: u64, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct PoolInfo {} - #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PoolHistory { pub history: Vec, @@ -105,10 +110,11 @@ pub struct PoolDelegators { } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct PoolBlocks {} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct PoolUpdates {} +pub struct PoolUpdates { + pub updates: Vec, +} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct PoolVotes {} +pub struct PoolVotes { + pub votes: Vec, +} diff --git a/common/src/stake_addresses.rs b/common/src/stake_addresses.rs index ba9f22cb..260e8d23 100644 --- a/common/src/stake_addresses.rs +++ b/common/src/stake_addresses.rs @@ -9,9 +9,9 @@ use std::{ }; use crate::{ - math::update_value_with_delta, messages::DRepDelegationDistribution, DRepChoice, - DRepCredential, DelegatedStake, KeyHash, Lovelace, StakeAddressDelta, StakeCredential, - Withdrawal, + math::update_value_with_delta, messages::DRepDelegationDistribution, + queries::accounts::PoolLiveStakeInfo, DRepChoice, DRepCredential, DelegatedStake, KeyHash, + Lovelace, StakeAddressDelta, StakeCredential, Withdrawal, }; use anyhow::Result; use dashmap::DashMap; @@ -110,6 +110,31 @@ impl StakeAddressMap { self.get(stake_key).map(|sas| sas.registered).unwrap_or(false) } + /// Get Pool's Live Stake Info + pub fn get_pool_live_stake_info(&self, spo: &KeyHash) -> PoolLiveStakeInfo { + let total_live_stakes = AtomicU64::new(0); + let live_stake = AtomicU64::new(0); + let live_delegators = AtomicU64::new(0); + + // Par Iter stake addresses values + self.inner.par_iter().for_each(|(_, sas)| { + total_live_stakes.fetch_add(sas.utxo_value, std::sync::atomic::Ordering::Relaxed); + if sas.delegated_spo.as_ref().map(|d_spo| d_spo.eq(spo)).unwrap_or(false) { + live_stake.fetch_add(sas.utxo_value, std::sync::atomic::Ordering::Relaxed); + live_delegators.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + }); + + let total_live_stakes = total_live_stakes.load(std::sync::atomic::Ordering::Relaxed); + let live_stake = live_stake.load(std::sync::atomic::Ordering::Relaxed); + let live_delegators = live_delegators.load(std::sync::atomic::Ordering::Relaxed); + PoolLiveStakeInfo { + live_stake, + live_delegators, + total_live_stakes, + } + } + /// Get Pool's Live Stake (same order as spos) pub fn get_pools_live_stakes(&self, spos: &Vec) -> Vec { let mut live_stakes_map = HashMap::::new(); @@ -142,7 +167,7 @@ impl StakeAddressMap { .filter_map(|(stake_key, sas)| match sas.delegated_spo.as_ref() { Some(delegated_spo) => { if delegated_spo.eq(pool_operator) { - Some((stake_key.clone(), sas.utxo_value + sas.rewards)) + Some((stake_key.clone(), sas.utxo_value)) } else { None } @@ -154,6 +179,23 @@ impl StakeAddressMap { delegators } + /// Map stake_keys to their utxo_value + /// Return None if any of the stake_keys are not found + pub fn get_accounts_utxo_values_map( + &self, + stake_keys: &[Vec], + ) -> Option, u64>> { + let mut map = HashMap::new(); + + for key in stake_keys { + let account = self.get(key)?; + let utxo_value = account.utxo_value; + map.insert(key.clone(), utxo_value); + } + + Some(map) + } + /// Map stake_keys to their total balances (utxo + rewards) /// Return None if any of the stake_keys are not found pub fn get_accounts_balances_map( @@ -188,6 +230,17 @@ impl StakeAddressMap { Some(map) } + /// Sum stake_keys utxo_values + /// Return None if any of the stake_keys are not found + pub fn get_accounts_utxo_values_sum(&self, stake_keys: &[Vec]) -> Option { + let mut total = 0; + for key in stake_keys { + let account = self.get(key)?; + total += account.utxo_value; + } + Some(total) + } + /// Sum stake_keys balances (utxo + rewards) /// Return None if any of stake_keys are not found pub fn get_account_balances_sum(&self, stake_keys: &[Vec]) -> Option { diff --git a/common/src/types.rs b/common/src/types.rs index fcde9675..62fb6bcd 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -96,7 +96,7 @@ pub struct BlockInfo { pub number: u64, /// Block hash - pub hash: Vec, + pub hash: BlockHash, /// Epoch number pub epoch: u64, @@ -296,6 +296,9 @@ pub type DataHash = Vec; /// Transaction hash pub type TxHash = [u8; 32]; +/// Block hash +pub type BlockHash = [u8; 32]; + /// Amount of Ada, in Lovelace pub type Lovelace = u64; pub type LovelaceDelta = i64; @@ -530,6 +533,14 @@ pub struct PoolMetadata { pub type RewardAccount = Vec; +/// Pool registration with position +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct PoolRegistrationWithPos { + pub reg: PoolRegistration, + pub tx_hash: TxHash, + pub cert_index: u64, +} + /// Pool registration data #[serde_as] #[derive( @@ -585,6 +596,14 @@ pub struct PoolRegistration { pub pool_metadata: Option, } +// Pool Retirment with position +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct PoolRetirementWithPos { + pub ret: PoolRetirement, + pub tx_hash: TxHash, + pub cert_index: u64, +} + /// Pool retirement data #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PoolRetirement { @@ -596,20 +615,42 @@ pub struct PoolRetirement { } /// Pool Update Action -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum PoolUpdateAction { + #[serde(rename = "registered")] Registered, + #[serde(rename = "deregistered")] Deregistered, } /// Pool Update Event +#[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PoolUpdateEvent { + #[serde_as(as = "Hex")] pub tx_hash: TxHash, pub cert_index: u64, pub action: PoolUpdateAction, } +impl PoolUpdateEvent { + pub fn register_event(tx_hash: TxHash, cert_index: u64) -> Self { + Self { + tx_hash, + cert_index, + action: PoolUpdateAction::Registered, + } + } + + pub fn retire_event(tx_hash: TxHash, cert_index: u64) -> Self { + Self { + tx_hash, + cert_index, + action: PoolUpdateAction::Deregistered, + } + } +} + /// Pool Epoch History Data #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PoolEpochState { @@ -1346,8 +1387,11 @@ impl Display for Voter { #[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq, Clone)] pub enum Vote { + #[serde(rename = "no")] No, + #[serde(rename = "yes")] Yes, + #[serde(rename = "abstain")] Abstain, } @@ -1482,11 +1526,11 @@ pub enum TxCertificate { /// Stake Delegation to a pool StakeDelegation(StakeDelegation), - /// Pool registration - PoolRegistration(PoolRegistration), + /// Pool registration With position + PoolRegistrationWithPos(PoolRegistrationWithPos), /// Pool retirement - PoolRetirement(PoolRetirement), + PoolRetirementWithPos(PoolRetirementWithPos), /// Genesis key delegation GenesisKeyDelegation(GenesisKeyDelegation), diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 6b84c672..652c63f0 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -449,6 +449,24 @@ impl AccountsState { } } + AccountsStateQuery::GetOptimalPoolSizing => { + let optimal_pool_sizing = state.get_optimal_pool_sizing(); + match optimal_pool_sizing { + Some(optimal_pool_sizing) => { + AccountsStateQueryResponse::OptimalPoolSizing(optimal_pool_sizing) + } + _ => { + AccountsStateQueryResponse::Error("Not Shelly Era yet".to_string()) + } + } + } + + AccountsStateQuery::GetPoolLiveStakeInfo { pool_operator } => { + AccountsStateQueryResponse::PoolLiveStakeInfo( + state.get_pool_live_stake_info(pool_operator), + ) + } + AccountsStateQuery::GetPoolsLiveStakes { pools_operators } => { AccountsStateQueryResponse::PoolsLiveStakes(PoolsLiveStakes { live_stakes: state.get_pools_live_stakes(pools_operators), @@ -470,6 +488,24 @@ impl AccountsState { ), }, + AccountsStateQuery::GetAccountsUtxoValuesMap { stake_keys } => { + match state.get_accounts_utxo_values_map(stake_keys) { + Some(map) => AccountsStateQueryResponse::AccountsUtxoValuesMap(map), + None => AccountsStateQueryResponse::Error( + "One or more accounts not found".to_string(), + ), + } + } + + AccountsStateQuery::GetAccountsUtxoValuesSum { stake_keys } => { + match state.get_accounts_utxo_values_sum(stake_keys) { + Some(sum) => AccountsStateQueryResponse::AccountsUtxoValuesSum(sum), + None => AccountsStateQueryResponse::Error( + "One or more accounts not found".to_string(), + ), + } + } + AccountsStateQuery::GetAccountsBalancesMap { stake_keys } => { match state.get_accounts_balances_map(stake_keys) { Some(map) => AccountsStateQueryResponse::AccountsBalancesMap(map), diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index d3c44266..07da2511 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -10,6 +10,7 @@ use acropolis_common::{ WithdrawalsMessage, }, protocol_params::ProtocolParams, + queries::accounts::{OptimalPoolSizing, PoolLiveStakeInfo}, stake_addresses::{StakeAddressMap, StakeAddressState}, DRepChoice, DRepCredential, DelegatedStake, InstantaneousRewardSource, InstantaneousRewardTarget, KeyHash, Lovelace, MoveInstantaneousReward, PoolRegistration, Pot, @@ -85,6 +86,31 @@ impl State { self.pots.clone() } + /// Get maximum pool size + /// ( total_supply - reserves) / nopt (from protocol parameters) + /// Return None if it is before Shelly Era + pub fn get_optimal_pool_sizing(&self) -> Option { + // Get Shelley parameters, silently return if too early in the chain so no + // rewards to calculate + let shelly_params = match &self.protocol_parameters { + Some(ProtocolParams { + shelley: Some(sp), .. + }) => sp, + _ => return None, + } + .clone(); + + let total_supply = + shelly_params.max_lovelace_supply - self.rewards_state.mark.pots.reserves; + let nopt = shelly_params.protocol_params.stake_pool_target_num as u64; + Some(OptimalPoolSizing { total_supply, nopt }) + } + + /// Get Pool Live Stake Info + pub fn get_pool_live_stake_info(&self, pool_operator: &KeyHash) -> PoolLiveStakeInfo { + self.stake_addresses.lock().unwrap().get_pool_live_stake_info(pool_operator) + } + /// Get Pools Live stake pub fn get_pools_live_stakes(&self, pool_operators: &Vec) -> Vec { self.stake_addresses.lock().unwrap().get_pools_live_stakes(pool_operators) @@ -95,6 +121,21 @@ impl State { self.stake_addresses.lock().unwrap().get_pool_delegators(pool_operator) } + /// Map stake_keys to their utxo_values + pub fn get_accounts_utxo_values_map( + &self, + stake_keys: &[Vec], + ) -> Option, u64>> { + let stake_addresses = self.stake_addresses.lock().ok()?; // If lock fails, return None + stake_addresses.get_accounts_utxo_values_map(stake_keys) + } + + /// Sum stake_keys utxo_values + pub fn get_accounts_utxo_values_sum(&self, stake_keys: &[Vec]) -> Option { + let stake_addresses = self.stake_addresses.lock().ok()?; // If lock fails, return None + stake_addresses.get_accounts_utxo_values_sum(stake_keys) + } + /// Map stake_keys to their total balances (utxo + rewards) pub fn get_accounts_balances_map( &self, diff --git a/modules/epoch_activity_counter/Cargo.toml b/modules/epochs_state/Cargo.toml similarity index 69% rename from modules/epoch_activity_counter/Cargo.toml rename to modules/epochs_state/Cargo.toml index d3d6ccf3..6c3b6cd6 100644 --- a/modules/epoch_activity_counter/Cargo.toml +++ b/modules/epochs_state/Cargo.toml @@ -1,11 +1,11 @@ -# Acropolis epoch activity counter module +# Acropolis epochs state module [package] -name = "acropolis_module_epoch_activity_counter" +name = "acropolis_module_epochs_state" version = "0.1.0" edition = "2021" authors = ["Paul Clark "] -description = "Epoch activity counter Caryatid module for Acropolis" +description = "Epochs State Caryatid module for Acropolis" license = "Apache-2.0" [dependencies] @@ -23,4 +23,4 @@ dashmap = "6.1.0" imbl = "6.0.0" [lib] -path = "src/epoch_activity_counter.rs" +path = "src/epochs_state.rs" diff --git a/modules/epoch_activity_counter/README.md b/modules/epochs_state/README.md similarity index 80% rename from modules/epoch_activity_counter/README.md rename to modules/epochs_state/README.md index bb499cf5..c07aa192 100644 --- a/modules/epoch_activity_counter/README.md +++ b/modules/epochs_state/README.md @@ -1,8 +1,8 @@ -# Epoch activity counter module +# Epochs state module -The epoch activity counter module accepts fee messages from the +The epochs state module accepts fee messages from the [TxUnpacker](../tx_unpacker) and totals up the fees on every -transaction in every block across an epoch. It also subscribes for +transaction in every block across an epoch. It also subscribes for block headers and records the KES keys for every block in the epoch, and sends a report at the end of the epoch that can be used by the reward calculator to allocate rewards to SPOs and thence to @@ -14,7 +14,7 @@ The following is the default configuration - if the defaults are OK, everything except the section header can be left out. ```toml -[module.epoch-activity-counter] +[module.epochs-state] # Message topics subscribe-headers-topic = "cardano.block.headers" @@ -32,5 +32,3 @@ Fetcher](../upstream_chain_fetcher) module for details). TODO subscription for fees TODO what it sends - - diff --git a/modules/epoch_activity_counter/src/epoch_activity_publisher.rs b/modules/epochs_state/src/epoch_activity_publisher.rs similarity index 100% rename from modules/epoch_activity_counter/src/epoch_activity_publisher.rs rename to modules/epochs_state/src/epoch_activity_publisher.rs diff --git a/modules/epoch_activity_counter/src/epochs_history.rs b/modules/epochs_state/src/epochs_history.rs similarity index 97% rename from modules/epoch_activity_counter/src/epochs_history.rs rename to modules/epochs_state/src/epochs_history.rs index 40f82982..57724135 100644 --- a/modules/epoch_activity_counter/src/epochs_history.rs +++ b/modules/epochs_state/src/epochs_history.rs @@ -62,7 +62,7 @@ mod tests { status: BlockStatus::Immutable, slot: 99, number: 42, - hash: Vec::new(), + hash: [0; 32], epoch, epoch_slot: 99, new_epoch: false, @@ -73,19 +73,19 @@ mod tests { #[test] fn epochs_history_is_none_when_store_history_is_false() { - let epochs_history = EpochsHistoryState::new(&StoreConfig::new(false)); + let epochs_history = EpochsHistoryState::new(&StoreConfig::new(false, false)); assert!(epochs_history.epochs_history.is_none()); } #[test] fn epochs_history_is_some_when_store_history_is_true() { - let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true)); + let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true, false)); assert!(epochs_history.epochs_history.is_some()); } #[test] fn handle_epoch_activity_saves_history() { - let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true)); + let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true, false)); let block = make_block(200); epochs_history.handle_epoch_activity( &block, diff --git a/modules/epoch_activity_counter/src/epoch_activity_counter.rs b/modules/epochs_state/src/epochs_state.rs similarity index 74% rename from modules/epoch_activity_counter/src/epoch_activity_counter.rs rename to modules/epochs_state/src/epochs_state.rs index 496c1fa7..f96ea1cd 100644 --- a/modules/epoch_activity_counter/src/epoch_activity_counter.rs +++ b/modules/epochs_state/src/epochs_state.rs @@ -1,11 +1,12 @@ -//! Acropolis epoch activity counter module for Caryatid +//! Acropolis epochs state module for Caryatid //! Unpacks block bodies to get transaction fees use acropolis_common::{ messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, queries::epochs::{ - BlocksMintedByPools, EpochInfo, EpochsStateQuery, EpochsStateQueryResponse, LatestEpoch, - TotalBlocksMintedByPools, DEFAULT_EPOCHS_QUERY_TOPIC, + BlockHashesByPool, BlocksMintedByPools, BlocksMintedInfoByPool, EpochInfo, + EpochsStateQuery, EpochsStateQueryResponse, LatestEpoch, TotalBlocksMintedByPools, + DEFAULT_EPOCHS_QUERY_TOPIC, }, state_history::{StateHistory, StateHistoryStore}, BlockInfo, BlockStatus, Era, @@ -29,33 +30,34 @@ use crate::{ store_config::StoreConfig, }; -const DEFAULT_SUBSCRIBE_HEADERS_TOPIC: &str = "cardano.block.header"; -const DEFAULT_SUBSCRIBE_FEES_TOPIC: &str = "cardano.block.fees"; -const DEFAULT_PUBLISH_TOPIC: &str = "cardano.epoch.activity"; -const DEFAULT_HANDLE_CURRENT_TOPIC: (&str, &str) = ("handle-topic-current-epoch", "rest.get.epoch"); -const DEFAULT_HANDLE_HISTORICAL_TOPIC: (&str, &str) = - ("handle-topic-historical-epoch", "rest.get.epochs.*"); +const DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC: (&str, &str) = + ("block-header-subscribe-topic", "cardano.block.header"); +const DEFAULT_BLOCK_FEES_SUBSCRIBE_TOPIC: (&str, &str) = + ("block-fees-subscribe-topic", "cardano.block.fees"); +const DEFAULT_EPOCH_ACTIVITY_PUBLISH_TOPIC: (&str, &str) = + ("epoch-activity-publish-topic", "cardano.epoch.activity"); -/// Epoch activity counter module +/// Epochs state module #[module( message_type(Message), - name = "epoch-activity-counter", - description = "Epoch activity counter" + name = "epochs-state", + description = "Epoch State" )] -pub struct EpochActivityCounter; +pub struct EpochsState; -impl EpochActivityCounter { +impl EpochsState { /// Run loop async fn run( history: Arc>>, epochs_history: EpochsHistoryState, + store_config: &StoreConfig, mut headers_subscription: Box>, mut fees_subscription: Box>, mut epoch_activity_publisher: EpochActivityPublisher, ) -> Result<()> { loop { // Get a mutable state - let mut state = history.lock().await.get_or_init_with(|| State::new()); + let mut state = history.lock().await.get_or_init_with(|| State::new(store_config)); let mut current_block: Option = None; // Read both topics in parallel @@ -67,7 +69,7 @@ impl EpochActivityCounter { match message.as_ref() { Message::Cardano((block_info, CardanoMessage::BlockHeader(header_msg))) => { let span = info_span!( - "epoch_activity_counter.handle_block_header", + "epochs_state.handle_block_header", block = block_info.number ); @@ -130,10 +132,8 @@ impl EpochActivityCounter { let (_, message) = fees_message_f.await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::BlockFees(fees_msg))) => { - let span = info_span!( - "epoch_activity_counter.handle_block_fees", - block = block_info.number - ); + let span = + info_span!("epochs_state.handle_block_fees", block = block_info.number); async { Self::check_sync(¤t_block, &block_info); state.handle_fees(&block_info, fees_msg.total_fees); @@ -155,31 +155,21 @@ impl EpochActivityCounter { /// Main init function pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { // Subscription topics - let subscribe_headers_topic = config - .get_string("subscribe-headers-topic") - .unwrap_or(DEFAULT_SUBSCRIBE_HEADERS_TOPIC.to_string()); - info!("Creating subscriber for headers on '{subscribe_headers_topic}'"); - - let subscribe_fees_topic = config - .get_string("subscribe-fees-topic") - .unwrap_or(DEFAULT_SUBSCRIBE_FEES_TOPIC.to_string()); - info!("Creating subscriber for fees on '{subscribe_fees_topic}'"); - - // REST handler topics - let handle_current_topic = config - .get_string(DEFAULT_HANDLE_CURRENT_TOPIC.0) - .unwrap_or(DEFAULT_HANDLE_CURRENT_TOPIC.1.to_string()); - info!("Creating request handler on '{}'", handle_current_topic); - - let handle_historical_topic = config - .get_string(DEFAULT_HANDLE_HISTORICAL_TOPIC.0) - .unwrap_or(DEFAULT_HANDLE_HISTORICAL_TOPIC.1.to_string()); - info!("Creating request handler on '{}'", handle_historical_topic); + let block_headers_subscribe_topic = config + .get_string(DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating subscriber for headers on '{block_headers_subscribe_topic}'"); + + let block_fees_subscribe_topic = config + .get_string(DEFAULT_BLOCK_FEES_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_BLOCK_FEES_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating subscriber for fees on '{block_fees_subscribe_topic}'"); // Publish topic - let publish_topic = - config.get_string("publish-topic").unwrap_or(DEFAULT_PUBLISH_TOPIC.to_string()); - info!("Publishing on '{publish_topic}'"); + let epoch_activity_publish_topic = config + .get_string(DEFAULT_EPOCH_ACTIVITY_PUBLISH_TOPIC.0) + .unwrap_or(DEFAULT_EPOCH_ACTIVITY_PUBLISH_TOPIC.1.to_string()); + info!("Publishing on '{epoch_activity_publish_topic}'"); // query topic let epochs_query_topic = config @@ -192,7 +182,7 @@ impl EpochActivityCounter { // state history let history = Arc::new(Mutex::new(StateHistory::::new( - "epoch_activity_counter", + "epochs_state", StateHistoryStore::default_block_store(), ))); let history_query = history.clone(); @@ -201,12 +191,13 @@ impl EpochActivityCounter { let epochs_history = EpochsHistoryState::new(&store_config); let epochs_history_query = epochs_history.clone(); - // Publisher - let epoch_activity_publisher = EpochActivityPublisher::new(context.clone(), publish_topic); - // Subscribe - let headers_subscription = context.subscribe(&subscribe_headers_topic).await?; - let fees_subscription = context.subscribe(&subscribe_fees_topic).await?; + let headers_subscription = context.subscribe(&block_headers_subscribe_topic).await?; + let fees_subscription = context.subscribe(&block_fees_subscribe_topic).await?; + + // Publisher + let epoch_activity_publisher = + EpochActivityPublisher::new(context.clone(), epoch_activity_publish_topic); // handle epochs query context.handle(&epochs_query_topic, move |message| { @@ -216,7 +207,7 @@ impl EpochActivityCounter { async move { let Message::StateQuery(StateQuery::Epochs(query)) = message.as_ref() else { return Arc::new(Message::StateQueryResponse(StateQueryResponse::Epochs( - EpochsStateQueryResponse::Error("Invalid message for epochs-state".into()), + EpochsStateQueryResponse::Error("Invalid message for epochs-query".into()), ))); }; @@ -255,6 +246,28 @@ impl EpochActivityCounter { ) } + EpochsStateQuery::GetBlocksMintedInfoByPool { vrf_key_hash } => { + let (total_blocks_minted, epoch_blocks_minted) = + state.get_blocks_minted_data_by_pool(vrf_key_hash); + EpochsStateQueryResponse::BlocksMintedInfoByPool(BlocksMintedInfoByPool { + total_blocks_minted, + epoch_blocks_minted, + }) + } + + EpochsStateQuery::GetBlockHashesByPool { vrf_key_hash } => { + if state.is_block_hashes_enabled() { + let hashes = state.get_block_hashes(vrf_key_hash); + EpochsStateQueryResponse::BlockHashesByPool(BlockHashesByPool { + hashes, + }) + } else { + EpochsStateQueryResponse::Error( + "Block hashes are not enabled".to_string(), + ) + } + } + _ => EpochsStateQueryResponse::Error(format!( "Unimplemented query variant: {:?}", query @@ -271,6 +284,7 @@ impl EpochActivityCounter { Self::run( history, epochs_history, + &store_config, headers_subscription, fees_subscription, epoch_activity_publisher, diff --git a/modules/epoch_activity_counter/src/state.rs b/modules/epochs_state/src/state.rs similarity index 69% rename from modules/epoch_activity_counter/src/state.rs rename to modules/epochs_state/src/state.rs index 47de578f..c11c4656 100644 --- a/modules/epoch_activity_counter/src/state.rs +++ b/modules/epochs_state/src/state.rs @@ -1,11 +1,17 @@ //! Acropolis epoch activity counter: state storage -use acropolis_common::{crypto::keyhash, messages::EpochActivityMessage, BlockInfo, KeyHash}; -use imbl::HashMap; -use tracing::info; +use acropolis_common::{ + crypto::keyhash, messages::EpochActivityMessage, BlockHash, BlockInfo, KeyHash, +}; +use imbl::{HashMap, Vector}; +use tracing::{error, info}; + +use crate::store_config::StoreConfig; #[derive(Default, Debug, Clone)] pub struct State { + store_config: StoreConfig, + // block number block: u64, @@ -24,29 +30,57 @@ pub struct State { // Total blocks minted till block number // Keyed by vrf_key_hash total_blocks_minted: HashMap, + + // block hashes by vrf_key_hash + block_hashes: Option>>, } impl State { // Constructor - pub fn new() -> Self { + pub fn new(store_config: &StoreConfig) -> Self { Self { + store_config: store_config.clone(), block: 0, epoch: 0, blocks_minted: HashMap::new(), epoch_blocks: 0, epoch_fees: 0, total_blocks_minted: HashMap::new(), + block_hashes: if store_config.store_block_hashes { + Some(HashMap::new()) + } else { + None + }, } } + pub fn is_block_hashes_enabled(&self) -> bool { + self.store_config.store_block_hashes + } + // Handle a block minting, taking the SPO's VRF vkey - pub fn handle_mint(&mut self, _block: &BlockInfo, vrf_vkey: Option<&[u8]>) { + pub fn handle_mint(&mut self, block: &BlockInfo, vrf_vkey: Option<&[u8]>) { self.epoch_blocks += 1; if let Some(vrf_vkey) = vrf_vkey { let vrf_key_hash = keyhash(vrf_vkey); // Count one on this hash *(self.blocks_minted.entry(vrf_key_hash.clone()).or_insert(0)) += 1; *(self.total_blocks_minted.entry(vrf_key_hash.clone()).or_insert(0)) += 1; + + if let Some(block_hashes) = self.block_hashes.as_mut() { + let block_hash: Option = block + .hash + .clone() + .try_into() + .inspect_err(|_| error!("Block hash is not 32 bytes")) + .ok(); + if let Some(block_hash) = block_hash { + block_hashes + .entry(vrf_key_hash.clone()) + .or_insert_with(Vector::new) + .push_back(block_hash); + } + } } } @@ -102,6 +136,25 @@ impl State { .map(|key_hash| self.total_blocks_minted.get(key_hash).map(|v| *v as u64).unwrap_or(0)) .collect() } + + pub fn get_blocks_minted_data_by_pool(&self, vrf_key_hash: &KeyHash) -> (u64, u64) { + ( + self.total_blocks_minted.get(vrf_key_hash).map(|v| *v as u64).unwrap_or(0), + self.blocks_minted.get(vrf_key_hash).map(|v| *v as u64).unwrap_or(0), + ) + } + + /// Get Block Hashes by Vrf Key Hash + pub fn get_block_hashes(&self, vrf_key_hash: &KeyHash) -> Vec { + let Some(block_hashes) = self.block_hashes.as_ref() else { + return vec![]; + }; + + block_hashes + .get(vrf_key_hash) + .map(|hashes| hashes.into_iter().cloned().collect()) + .unwrap_or_default() + } } #[cfg(test)] @@ -112,7 +165,7 @@ mod tests { use acropolis_common::{ crypto::keyhash, state_history::{StateHistory, StateHistoryStore}, - BlockInfo, BlockStatus, Era, + BlockHash, BlockInfo, BlockStatus, Era, }; use tokio::sync::Mutex; @@ -121,7 +174,7 @@ mod tests { status: BlockStatus::Immutable, slot: 0, number: epoch * 10, - hash: Vec::new(), + hash: BlockHash::default(), epoch, epoch_slot: 99, new_epoch: false, @@ -135,7 +188,7 @@ mod tests { status: BlockStatus::RolledBack, slot: 0, number: epoch * 10, - hash: Vec::new(), + hash: BlockHash::default(), epoch, epoch_slot: 99, new_epoch: false, @@ -146,15 +199,16 @@ mod tests { #[test] fn initial_state_is_zeroed() { - let state = State::new(); + let state = State::new(&StoreConfig::default()); assert_eq!(state.epoch_blocks, 0); assert_eq!(state.epoch_fees, 0); assert!(state.blocks_minted.is_empty()); + assert!(state.block_hashes.is_none()); } #[test] fn handle_mint_single_vrf_records_counts() { - let mut state = State::new(); + let mut state = State::new(&StoreConfig::default()); let vrf = b"vrf_key"; let mut block = make_block(100); state.handle_mint(&block, Some(vrf)); @@ -172,7 +226,7 @@ mod tests { #[test] fn handle_mint_multiple_vrf_records_counts() { - let mut state = State::new(); + let mut state = State::new(&StoreConfig::default()); let mut block = make_block(100); state.handle_mint(&block, Some(b"vrf_1")); block.number += 1; @@ -190,11 +244,34 @@ mod tests { state.blocks_minted.iter().find(|(k, _)| *k == &keyhash(b"vrf_2")).map(|(_, v)| *v), Some(2) ); + + let blocks_minted_data = state.get_blocks_minted_data_by_pool(&keyhash(b"vrf_2")); + assert_eq!(blocks_minted_data.0, 2); + assert_eq!(blocks_minted_data.1, 2); + } + + #[test] + fn store_block_hashes_after_handle_mint() { + let mut state = State::new(&StoreConfig::new(false, true)); + let mut block = make_block(100); + state.handle_mint(&block, Some(b"vrf_1")); + block.number += 1; + state.handle_mint(&block, Some(b"vrf_2")); + block.number += 1; + state.handle_mint(&block, Some(b"vrf_2")); + + let block_hashes_1 = state.get_block_hashes(&keyhash(b"vrf_1")); + assert_eq!(block_hashes_1.len(), 1); + assert_eq!(block_hashes_1[0], block.hash); + let block_hashes_2 = state.get_block_hashes(&keyhash(b"vrf_2")); + assert_eq!(block_hashes_2.len(), 2); + assert_eq!(block_hashes_2[0], block.hash); + assert_eq!(block_hashes_2[1], block.hash); } #[test] fn handle_fees_counts_fees() { - let mut state = State::new(); + let mut state = State::new(&StoreConfig::default()); let mut block = make_block(100); state.handle_fees(&block, 100); @@ -206,7 +283,7 @@ mod tests { #[test] fn end_epoch_resets_and_returns_message() { - let mut state = State::new(); + let mut state = State::new(&StoreConfig::default()); let block = make_block(1); state.handle_mint(&block, Some(b"vrf_1")); state.handle_fees(&block, 123); @@ -227,12 +304,16 @@ mod tests { assert_eq!(state.epoch_blocks, 0); assert_eq!(state.epoch_fees, 0); assert!(state.blocks_minted.is_empty()); + + let blocks_minted_data = state.get_blocks_minted_data_by_pool(&keyhash(b"vrf_1")); + assert_eq!(blocks_minted_data.0, 1); + assert_eq!(blocks_minted_data.1, 0); } #[tokio::test] async fn state_is_rolled_back() { let history = Arc::new(Mutex::new(StateHistory::::new( - "epoch_activity_counter", + "epochs_state", StateHistoryStore::default_block_store(), ))); let mut state = history.lock().await.get_current_state(); diff --git a/modules/epoch_activity_counter/src/store_config.rs b/modules/epochs_state/src/store_config.rs similarity index 51% rename from modules/epoch_activity_counter/src/store_config.rs rename to modules/epochs_state/src/store_config.rs index 409c3750..d74e51d4 100644 --- a/modules/epoch_activity_counter/src/store_config.rs +++ b/modules/epochs_state/src/store_config.rs @@ -3,16 +3,21 @@ use std::sync::Arc; use config::Config; const DEFAULT_STORE_HISTORY: (&str, bool) = ("store-history", false); +const DEFAULT_STORE_BLOCK_HAHSES: (&str, bool) = ("store-block-hashes", false); -#[derive(Debug, Clone)] +#[derive(Default, Debug, Clone)] pub struct StoreConfig { pub store_history: bool, + pub store_block_hashes: bool, } impl StoreConfig { #[allow(dead_code)] - pub fn new(store_history: bool) -> Self { - Self { store_history } + pub fn new(store_history: bool, store_block_hashes: bool) -> Self { + Self { + store_history, + store_block_hashes, + } } } @@ -22,6 +27,9 @@ impl From> for StoreConfig { store_history: config .get_bool(DEFAULT_STORE_HISTORY.0) .unwrap_or(DEFAULT_STORE_HISTORY.1), + store_block_hashes: config + .get_bool(DEFAULT_STORE_BLOCK_HAHSES.0) + .unwrap_or(DEFAULT_STORE_BLOCK_HAHSES.1), } } } diff --git a/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs b/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs index 76398241..8a140f77 100644 --- a/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs +++ b/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs @@ -6,8 +6,8 @@ use acropolis_common::{ messages::{ CardanoMessage, GenesisCompleteMessage, Message, PotDeltasMessage, UTXODeltasMessage, }, - Address, BlockInfo, BlockStatus, ByronAddress, Era, Lovelace, LovelaceDelta, Pot, PotDelta, - TxOutput, UTXODelta, Value, + Address, BlockHash, BlockInfo, BlockStatus, ByronAddress, Era, Lovelace, LovelaceDelta, Pot, + PotDelta, TxOutput, UTXODelta, Value, }; use anyhow::Result; use caryatid_sdk::{module, Context, Module}; @@ -106,7 +106,7 @@ impl GenesisBootstrapper { status: BlockStatus::Bootstrap, slot: 0, number: 0, - hash: Vec::new(), + hash: BlockHash::default(), epoch: 0, epoch_slot: 0, new_epoch: false, diff --git a/modules/governance_state/src/alonzo_babbage_voting.rs b/modules/governance_state/src/alonzo_babbage_voting.rs index 7c95be9f..bc90a45b 100644 --- a/modules/governance_state/src/alonzo_babbage_voting.rs +++ b/modules/governance_state/src/alonzo_babbage_voting.rs @@ -113,7 +113,8 @@ mod tests { use crate::alonzo_babbage_voting::AlonzoBabbageVoting; use acropolis_common::{ rational_number::rational_number_from_f32, AlonzoBabbageUpdateProposal, - AlonzoBabbageVotingOutcome, BlockInfo, BlockStatus, GenesisKeyhash, ProtocolParamUpdate, + AlonzoBabbageVotingOutcome, BlockHash, BlockInfo, BlockStatus, GenesisKeyhash, + ProtocolParamUpdate, }; use anyhow::Result; use serde_with::{base64::Base64, serde_as}; @@ -155,7 +156,7 @@ mod tests { era: era.try_into()?, new_epoch: new_epoch != 0, timestamp: 0, - hash: Vec::new(), + hash: BlockHash::default(), }; for prop in proposals { diff --git a/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs b/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs index 3ad06d0e..6b4bc0ac 100644 --- a/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs +++ b/modules/mithril_snapshot_fetcher/src/mithril_snapshot_fetcher.rs @@ -324,7 +324,7 @@ impl MithrilSnapshotFetcher { status: BlockStatus::Immutable, slot, number, - hash: block.hash().to_vec(), + hash: *block.hash(), epoch, epoch_slot, new_epoch, diff --git a/modules/rest_blockfrost/src/handlers/governance.rs b/modules/rest_blockfrost/src/handlers/governance.rs index 0c497d38..237d724a 100644 --- a/modules/rest_blockfrost/src/handlers/governance.rs +++ b/modules/rest_blockfrost/src/handlers/governance.rs @@ -217,7 +217,7 @@ pub async fn handle_drep_delegators_blockfrost( } let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( - AccountsStateQuery::GetAccountsBalancesMap { stake_keys }, + AccountsStateQuery::GetAccountsUtxoValuesMap { stake_keys }, ))); let raw_msg = @@ -226,11 +226,11 @@ pub async fn handle_drep_delegators_blockfrost( match message { Message::StateQueryResponse(StateQueryResponse::Accounts( - AccountsStateQueryResponse::AccountsBalancesMap(map), + AccountsStateQueryResponse::AccountsUtxoValuesMap(map), )) => { let mut response = Vec::new(); - for (key, amount) in map { + for (key, utxo_value) in map { let Some(bech32) = stake_key_to_bech32.get(&key) else { return Ok(RESTResponse::with_text( 500, @@ -240,7 +240,7 @@ pub async fn handle_drep_delegators_blockfrost( response.push(serde_json::json!({ "address": bech32, - "amount": amount.to_string(), + "amount": utxo_value.to_string(), })); } diff --git a/modules/rest_blockfrost/src/handlers/pools.rs b/modules/rest_blockfrost/src/handlers/pools.rs index e21c869b..f2e32bd3 100644 --- a/modules/rest_blockfrost/src/handlers/pools.rs +++ b/modules/rest_blockfrost/src/handlers/pools.rs @@ -4,22 +4,23 @@ use acropolis_common::{ queries::{ accounts::{AccountsStateQuery, AccountsStateQueryResponse}, epochs::{EpochsStateQuery, EpochsStateQueryResponse}, - parameters::{ParametersStateQuery, ParametersStateQueryResponse}, pools::{PoolsStateQuery, PoolsStateQueryResponse}, utils::query_state, }, + rest_helper::ToCheckedF64, serialization::Bech32WithHrp, - PoolRetirement, StakeCredential, + PoolRetirement, PoolUpdateAction, StakeCredential, TxHash, }; use anyhow::Result; use caryatid_sdk::Context; use rust_decimal::Decimal; use std::{sync::Arc, time::Duration}; +use tokio::join; use tracing::warn; use crate::{ handlers_config::HandlersConfig, - types::{PoolDelegatorRest, PoolRelayRest}, + types::{PoolDelegatorRest, PoolInfoRest, PoolRelayRest}, }; use crate::{ types::{PoolEpochStateRest, PoolExtendedRest, PoolMetadataRest, PoolRetirementRest}, @@ -102,7 +103,14 @@ pub async fn handle_pools_extended_retired_retiring_single_blockfrost( return handle_pools_retiring_blockfrost(context.clone(), handlers_config.clone()).await } _ => match Vec::::from_bech32_with_hrp(param, "pool") { - Ok(pool_id) => return handle_pools_spo_blockfrost(context.clone(), pool_id).await, + Ok(pool_id) => { + return handle_pools_spo_blockfrost( + context.clone(), + handlers_config.clone(), + pool_id, + ) + .await + } Err(e) => { return Ok(RESTResponse::with_text( 400, @@ -194,21 +202,17 @@ async fn handle_pools_extended_blockfrost( epoch: latest_epoch, }, ))); - let (pools_active_stakes, total_active_stake) = query_state( + let pools_active_stakes = query_state( &context, &handlers_config.pools_query_topic, pools_active_stakes_msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Pools( PoolsStateQueryResponse::PoolsActiveStakes(res), - )) => Ok((res.active_stakes, res.total_active_stake)), + )) => Ok(Some(res.active_stakes)), Message::StateQueryResponse(StateQueryResponse::Pools( - PoolsStateQueryResponse::Error(e), - )) => { - return Err(anyhow::anyhow!( - "Internal server error while retrieving pools active stakes: {e}" - )); - } + PoolsStateQueryResponse::Error(_), + )) => Ok(None), _ => { return Err(anyhow::anyhow!( "Unexpected message type while retrieving pools active stakes" @@ -246,7 +250,7 @@ async fn handle_pools_extended_blockfrost( ) .await?; - // Get total blocks minted for each pool from epoch-activity-counter + // Get total blocks minted for each pool from epochs_state let total_blocks_minted_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( EpochsStateQuery::GetTotalBlocksMintedByPools { vrf_key_hashes: pools_vrf_key_hashes, @@ -274,31 +278,27 @@ async fn handle_pools_extended_blockfrost( ) .await?; - // Get latest parameters from parameters-state - let latest_parameters_msg = Arc::new(Message::StateQuery(StateQuery::Parameters( - ParametersStateQuery::GetLatestEpochParameters, + // Get optimal_pool_sizing from accounts_state + let optimal_pool_sizing_msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetOptimalPoolSizing, ))); - let latest_parameters = query_state( + let optimal_pool_sizing = query_state( &context, - &handlers_config.parameters_query_topic, - latest_parameters_msg, + &handlers_config.accounts_query_topic, + optimal_pool_sizing_msg, |message| match message { - Message::StateQueryResponse(StateQueryResponse::Parameters( - ParametersStateQueryResponse::LatestEpochParameters(params), - )) => Ok(params), - Message::StateQueryResponse(StateQueryResponse::Parameters( - ParametersStateQueryResponse::Error(e), - )) => Err(anyhow::anyhow!( - "Internal server error while retrieving latest parameters: {e}" - )), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::OptimalPoolSizing(res), + )) => Ok(Some(res)), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::Error(_), + )) => Ok(None), _ => Err(anyhow::anyhow!("Unexpected message type")), }, ) .await?; - let Some(stake_pool_target_num) = - latest_parameters.shelley.map(|shelly| shelly.protocol_params.stake_pool_target_num) - else { - // when shelly era is not started, return empty list + let Some(optimal_pool_sizing) = optimal_pool_sizing else { + // if it is before Shelly Era return Ok(RESTResponse::with_json(500, "[]")); }; @@ -309,19 +309,18 @@ async fn handle_pools_extended_blockfrost( .map(|(i, (pool_operator, pool_registration))| { Ok(PoolExtendedRest { pool_id: pool_operator.to_bech32_with_hrp("pool")?, - hex: hex::encode(pool_operator), - active_stake: pools_active_stakes[i].to_string(), - live_stake: pools_live_stakes[i].to_string(), + hex: pool_operator.clone(), + active_stake: pools_active_stakes + .as_ref() + .map(|active_stakes| active_stakes[i]), + live_stake: pools_live_stakes[i], blocks_minted: total_blocks_minted[i], - live_saturation: if total_active_stake > 0 { - Decimal::from(pools_live_stakes[i]) * Decimal::from(stake_pool_target_num) - / Decimal::from(total_active_stake) - } else { - Decimal::from(0) - }, - declared_pledge: pool_registration.pledge.to_string(), + live_saturation: Decimal::from(pools_live_stakes[i]) + * Decimal::from(optimal_pool_sizing.nopt) + / Decimal::from(optimal_pool_sizing.total_supply), + declared_pledge: pool_registration.pledge, margin_cost: pool_registration.margin.to_f32(), - fixed_cost: pool_registration.cost.to_string(), + fixed_cost: pool_registration.cost, }) }) .collect(); @@ -434,10 +433,281 @@ async fn handle_pools_retiring_blockfrost( } async fn handle_pools_spo_blockfrost( - _context: Arc>, - _pool_operator: Vec, + context: Arc>, + handlers_config: Arc, + pool_operator: Vec, ) -> Result { - Ok(RESTResponse::with_text(501, "Not implemented")) + // query pool registration from pool state + let pool_info_msg = Arc::new(Message::StateQuery(StateQuery::Pools( + PoolsStateQuery::GetPoolInfo { + pool_id: pool_operator.clone(), + }, + ))); + + let pool_info_f = query_state( + &context, + &handlers_config.pools_query_topic, + pool_info_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::PoolInfo(pool_info), + )) => Ok(pool_info), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::NotFound, + )) => Err(anyhow::anyhow!("Pool Not found")), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while retrieving pool info: {e}" + )), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ); + + // Get Latest Epoch from epochs-state + let latest_epoch_info_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetLatestEpoch, + ))); + let latest_epoch_info_f = query_state( + &context, + &handlers_config.epochs_query_topic, + latest_epoch_info_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::LatestEpoch(res), + )) => Ok(res.epoch), + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(e), + )) => { + return Err(anyhow::anyhow!( + "Internal server error while retrieving latest epoch: {e}" + )); + } + _ => { + return Err(anyhow::anyhow!( + "Unexpected message type while retrieving latest epoch" + )) + } + }, + ); + + // query live stakes info from accounts_state + let live_stakes_info_msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetPoolLiveStakeInfo { + pool_operator: pool_operator.clone(), + }, + ))); + let live_stakes_info_f = query_state( + &context, + &handlers_config.accounts_query_topic, + live_stakes_info_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::PoolLiveStakeInfo(res), + )) => Ok(res), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ); + + // Get optimal_pool_sizing from accounts_state + let optimal_pool_sizing_msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetOptimalPoolSizing, + ))); + let optimal_pool_sizing_f = query_state( + &context, + &handlers_config.accounts_query_topic, + optimal_pool_sizing_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::OptimalPoolSizing(res), + )) => Ok(Some(res)), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::Error(_), + )) => Ok(None), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ); + + // Query pool update events from spo_state + let pool_updates_msg = Arc::new(Message::StateQuery(StateQuery::Pools( + PoolsStateQuery::GetPoolUpdates { + pool_id: pool_operator.clone(), + }, + ))); + let pool_updates_f = query_state( + &context, + &handlers_config.pools_query_topic, + pool_updates_msg, + |message: Message| match message { + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::PoolUpdates(pool_updates), + )) => Ok(Some(pool_updates.updates)), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::NotFound, + )) => Err(anyhow::anyhow!("Pool Not found")), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::Error(_e), + )) => Ok(None), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ); + + let (pool_info, latest_epoch_info, live_stakes_info, optimal_pool_sizing, pool_updates) = join!( + pool_info_f, + latest_epoch_info_f, + live_stakes_info_f, + optimal_pool_sizing_f, + pool_updates_f + ); + let pool_info = pool_info?; + let latest_epoch_info = latest_epoch_info?; + let latest_epoch = latest_epoch_info.epoch; + let live_stakes_info = live_stakes_info?; + let Some(optimal_pool_sizing) = optimal_pool_sizing? else { + // if it is before Shelly Era + return Ok(RESTResponse::with_json(404, "Pool Not Found")); + }; + let pool_updates = pool_updates?; + let registrations: Option> = pool_updates.as_ref().map(|updates| { + updates + .iter() + .filter_map(|update| { + if update.action == PoolUpdateAction::Registered { + Some(update.tx_hash) + } else { + None + } + }) + .collect() + }); + let retirements: Option> = pool_updates.as_ref().map(|updates| { + updates + .iter() + .filter_map(|update| { + if update.action == PoolUpdateAction::Deregistered { + Some(update.tx_hash) + } else { + None + } + }) + .collect() + }); + + // query blocks minted data from epochs_state + let blocks_minted_data_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetBlocksMintedInfoByPool { + vrf_key_hash: pool_info.vrf_key_hash.clone(), + }, + ))); + let blocks_minted_data = query_state( + &context, + &handlers_config.epochs_query_topic, + blocks_minted_data_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::BlocksMintedInfoByPool(res), + )) => Ok(res), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ) + .await?; + + // query active stakes info from spo_state + let active_stakes_info_msg = Arc::new(Message::StateQuery(StateQuery::Pools( + PoolsStateQuery::GetPoolActiveStakeInfo { + pool_operator: pool_operator.clone(), + epoch: latest_epoch, + }, + ))); + let active_stakes_info = query_state( + &context, + &handlers_config.pools_query_topic, + active_stakes_info_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::PoolActiveStakeInfo(res), + )) => Ok(Some(res)), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::Error(_e), + )) => Ok(None), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ) + .await?; + + // Get live_pledge + // Query owner accounts balance sum from accounts_state + let live_pledge_msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetAccountsUtxoValuesSum { + stake_keys: pool_info.pool_owners.clone(), + }, + ))); + + let live_pledge = query_state( + &context, + &handlers_config.accounts_query_topic, + live_pledge_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::AccountsUtxoValuesSum(res), + )) => Ok(res), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while retrieving live pledge: {e}" + )), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ) + .await?; + + let pool_id = pool_info.operator.to_bech32_with_hrp("pool").unwrap(); + let reward_account = pool_info.reward_account.to_bech32_with_hrp("stake"); + let Ok(reward_account) = reward_account else { + return Ok(RESTResponse::with_text(404, "Invalid Reward Account")); + }; + let pool_owners = pool_info + .pool_owners + .iter() + .map(|owner| owner.to_bech32_with_hrp("stake")) + .collect::, _>>(); + let Ok(pool_owners) = pool_owners else { + return Ok(RESTResponse::with_text(404, "Invalid Pool Owners")); + }; + let pool_info_rest: PoolInfoRest = PoolInfoRest { + pool_id, + hex: pool_info.operator, + vrf_key: pool_info.vrf_key_hash, + blocks_minted: blocks_minted_data.total_blocks_minted, + blocks_epoch: blocks_minted_data.epoch_blocks_minted, + live_stake: live_stakes_info.live_stake, + live_size: Decimal::from(live_stakes_info.live_stake) + / Decimal::from(live_stakes_info.total_live_stakes), + live_saturation: Decimal::from(live_stakes_info.live_stake) + * Decimal::from(optimal_pool_sizing.nopt) + / Decimal::from(optimal_pool_sizing.total_supply), + live_delegators: live_stakes_info.live_delegators, + active_stake: active_stakes_info.as_ref().map(|info| info.active_stake), + active_size: active_stakes_info + .as_ref() + .map(|info| info.active_size.to_checked_f64("active_size").unwrap_or(0.0)), + declared_pledge: pool_info.pledge, + live_pledge: live_pledge, + margin_cost: pool_info.margin.to_f32(), + fixed_cost: pool_info.cost, + reward_account, + pool_owners, + registration: registrations, + retirement: retirements, + }; + + match serde_json::to_string(&pool_info_rest) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Internal server error while retrieving pool info: {e}"), + )), + } } pub async fn handle_pool_history_blockfrost( @@ -743,25 +1013,179 @@ pub async fn handle_pool_delegators_blockfrost( } pub async fn handle_pool_blocks_blockfrost( - _context: Arc>, - _params: Vec, - _handlers_config: Arc, + context: Arc>, + params: Vec, + handlers_config: Arc, ) -> Result { - Ok(RESTResponse::with_text(501, "Not implemented")) + let Some(pool_id) = params.get(0) else { + return Ok(RESTResponse::with_text(400, "Missing pool ID parameter")); + }; + + let Ok(spo) = Vec::::from_bech32_with_hrp(pool_id, "pool") else { + return Ok(RESTResponse::with_text( + 400, + &format!("Invalid Bech32 stake pool ID: {pool_id}"), + )); + }; + + // query pool registration from pool state + let pool_info_msg = Arc::new(Message::StateQuery(StateQuery::Pools( + PoolsStateQuery::GetPoolInfo { + pool_id: spo.clone(), + }, + ))); + + let pool_info = query_state( + &context, + &handlers_config.pools_query_topic, + pool_info_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::PoolInfo(pool_info), + )) => Ok(pool_info), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::NotFound, + )) => Err(anyhow::anyhow!("Pool Not found")), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!( + "Internal server error while retrieving pool info: {e}" + )), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ) + .await?; + + let vrf_key_hash = pool_info.vrf_key_hash; + + // query block hashes by vrf key hash + // from epochs_state + let pool_blocks_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetBlockHashesByPool { vrf_key_hash }, + ))); + + let pool_blocks = query_state( + &context, + &handlers_config.epochs_query_topic, + pool_blocks_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::BlockHashesByPool(pool_blocks), + )) => Ok(pool_blocks.hashes), + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(_), + )) => Err(anyhow::anyhow!("Block hashes are not enabled")), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ) + .await?; + + let pool_blocks_rest = pool_blocks.into_iter().map(|b| hex::encode(b)).collect::>(); + match serde_json::to_string(&pool_blocks_rest) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Internal server error while retrieving pool blocks: {e}"), + )), + } } pub async fn handle_pool_updates_blockfrost( - _context: Arc>, - _params: Vec, - _handlers_config: Arc, + context: Arc>, + params: Vec, + handlers_config: Arc, ) -> Result { - Ok(RESTResponse::with_text(501, "Not implemented")) + let Some(pool_id) = params.get(0) else { + return Ok(RESTResponse::with_text(400, "Missing pool ID parameter")); + }; + + let Ok(spo) = Vec::::from_bech32_with_hrp(pool_id, "pool") else { + return Ok(RESTResponse::with_text( + 400, + &format!("Invalid Bech32 stake pool ID: {pool_id}"), + )); + }; + + // query from spo_state + let pool_updates_msg = Arc::new(Message::StateQuery(StateQuery::Pools( + PoolsStateQuery::GetPoolUpdates { + pool_id: spo.clone(), + }, + ))); + let pool_updates = query_state( + &context, + &handlers_config.pools_query_topic, + pool_updates_msg, + |message: Message| match message { + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::PoolUpdates(pool_updates), + )) => Ok(pool_updates.updates), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::NotFound, + )) => Err(anyhow::anyhow!("Pool Not found")), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!("Error: {e}")), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ) + .await?; + + match serde_json::to_string(&pool_updates) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Internal server error while retrieving pool updates: {e}"), + )), + } } pub async fn handle_pool_votes_blockfrost( - _context: Arc>, - _params: Vec, - _handlers_config: Arc, + context: Arc>, + params: Vec, + handlers_config: Arc, ) -> Result { - Ok(RESTResponse::with_text(501, "Not implemented")) + let Some(pool_id) = params.get(0) else { + return Ok(RESTResponse::with_text(400, "Missing pool ID parameter")); + }; + + let Ok(spo) = Vec::::from_bech32_with_hrp(pool_id, "pool") else { + return Ok(RESTResponse::with_text( + 400, + &format!("Invalid Bech32 stake pool ID: {pool_id}"), + )); + }; + + // query from spo_state + let pool_votes_msg = Arc::new(Message::StateQuery(StateQuery::Pools( + PoolsStateQuery::GetPoolVotes { + pool_id: spo.clone(), + }, + ))); + let pool_votes = query_state( + &context, + &handlers_config.pools_query_topic, + pool_votes_msg, + |message: Message| match message { + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::PoolVotes(pool_votes), + )) => Ok(pool_votes.votes), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::NotFound, + )) => Err(anyhow::anyhow!("Pool Not found")), + Message::StateQueryResponse(StateQueryResponse::Pools( + PoolsStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!("Error: {e}")), + _ => Err(anyhow::anyhow!("Unexpected message type")), + }, + ) + .await?; + + match serde_json::to_string(&pool_votes) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Internal server error while retrieving pool votes: {e}"), + )), + } } diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 786d128d..e2d969a6 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -4,12 +4,13 @@ use acropolis_common::{ protocol_params::{Nonce, NonceVariant, ProtocolParams}, queries::governance::DRepActionUpdate, rest_helper::ToCheckedF64, - PoolEpochState, Relay, Vote, + KeyHash, PoolEpochState, Relay, TxHash, Vote, }; use num_traits::ToPrimitive; use rust_decimal::Decimal; use serde::Serialize; use serde_json::{json, Value}; +use serde_with::{hex::Hex, serde_as, DisplayFromStr}; use std::collections::HashMap; // REST response structure for /epoch @@ -240,17 +241,23 @@ pub struct ProposalMetadataREST { } // RET response structure for /pools/extended +#[serde_as] #[derive(Serialize)] pub struct PoolExtendedRest { pub pool_id: String, - pub hex: String, - pub active_stake: String, // u64 in string - pub live_stake: String, // u64 in string + #[serde_as(as = "Hex")] + pub hex: Vec, + #[serde_as(as = "Option")] + pub active_stake: Option, + #[serde_as(as = "DisplayFromStr")] + pub live_stake: u64, pub blocks_minted: u64, pub live_saturation: Decimal, - pub declared_pledge: String, // u64 in string + #[serde_as(as = "DisplayFromStr")] + pub declared_pledge: u64, pub margin_cost: f32, - pub fixed_cost: String, // u64 in string + #[serde_as(as = "DisplayFromStr")] + pub fixed_cost: u64, } // REST response structure for /pools/retired and /pools/retiring @@ -355,6 +362,40 @@ impl From for PoolRelayRest { } } +// REST response structure for `/pools/{pool_id}` +#[serde_as] +#[derive(Serialize)] +pub struct PoolInfoRest { + pub pool_id: String, + #[serde_as(as = "Hex")] + pub hex: KeyHash, + #[serde_as(as = "Hex")] + pub vrf_key: KeyHash, + pub blocks_minted: u64, + pub blocks_epoch: u64, + #[serde_as(as = "DisplayFromStr")] + pub live_stake: u64, + pub live_size: Decimal, + pub live_saturation: Decimal, + pub live_delegators: u64, + #[serde_as(as = "Option")] + pub active_stake: Option, + pub active_size: Option, + #[serde_as(as = "DisplayFromStr")] + pub declared_pledge: u64, + #[serde_as(as = "DisplayFromStr")] + pub live_pledge: u64, + pub margin_cost: f32, + #[serde_as(as = "DisplayFromStr")] + pub fixed_cost: u64, + pub reward_account: String, + pub pool_owners: Vec, + #[serde_as(as = "Option>")] + pub registration: Option>, + #[serde_as(as = "Option>")] + pub retirement: Option>, +} + // REST response structure for protocol params #[derive(Serialize)] pub struct ProtocolParamsRest { diff --git a/modules/spo_state/src/aggregated_state.rs b/modules/spo_state/src/aggregated_state.rs deleted file mode 100644 index 46493ad3..00000000 --- a/modules/spo_state/src/aggregated_state.rs +++ /dev/null @@ -1,130 +0,0 @@ -use std::{collections::BTreeMap, sync::Arc}; - -use acropolis_common::{messages::SPOStakeDistributionMessage, BlockInfo, KeyHash}; -use dashmap::DashMap; -use rayon::prelude::*; -use serde::Serialize; -use tracing::error; - -// Aggregated SPO State by epoch N-1 (when current epoch is N) -// Active Stakes and total blocks minted count -#[derive(Clone)] -pub struct AggregatedSPOState { - /// Active stakes for each pool operator - /// (epoch number, active stake) - /// Remove elements when epoch number is less than current epoch number - pub active_stakes: Arc>>, -} - -#[derive(Default, Debug, Clone, Serialize)] -pub struct TotalBlocksMintedState { - /// block number of Epoch Boundary from N-1 to N - block: u64, -} - -impl AggregatedSPOState { - pub fn new() -> Self { - Self { - active_stakes: Arc::new(DashMap::new()), - } - } - - /// Get Pools Active Stakes by epoch and total active stake - /// ## Arguments - /// * `pools_operators` - A vector of pool operator hashes - /// * `epoch` - The epoch to get the active stakes for - /// ## Returns - /// `(Vec, u64)` - a vector of active stakes for each pool operator and the total active stake. - pub fn get_pools_active_stakes( - &self, - pools_operators: &Vec, - epoch: u64, - ) -> (Vec, u64) { - let active_stakes = pools_operators - .par_iter() - .map(|spo| self.get_active_stake(spo, epoch).unwrap_or(0)) - .collect::>(); - let total_active_stake = self.get_total_active_stake(epoch); - (active_stakes, total_active_stake) - } - - fn get_active_stake(&self, spo: &KeyHash, epoch: u64) -> Option { - self.active_stakes.get(spo).map(|stakes| stakes.get(&epoch).cloned()).flatten() - } - - fn get_total_active_stake(&self, epoch: u64) -> u64 { - self.active_stakes.iter().map(|entry| entry.value().get(&epoch).cloned().unwrap_or(0)).sum() - } - - /// Handle SPO Stake Distribution - /// Live stake snapshots taken at Epoch N - 1 to N boundary (Mark at Epoch N) - /// Active stake is valid from Epoch N + 1 (Set at Epoch N + 1) - /// - pub fn handle_spdd(&self, block: &BlockInfo, spdd_message: &SPOStakeDistributionMessage) { - let SPOStakeDistributionMessage { epoch, spos } = spdd_message; - if *epoch != block.epoch - 1 { - error!( - "SPO Stake Distribution Message's epoch {} is wrong against current block's epoch {}", - *epoch, block.epoch - ) - } - let epoch_to_update = *epoch + 2; - - // update active stakes - spos.par_iter().for_each(|(spo, value)| { - let mut active_stakes = self - .active_stakes - .entry(spo.clone()) - .and_modify(|stakes| stakes.retain(|k, _| *k >= block.epoch)) - .or_insert_with(BTreeMap::new); - - active_stakes.insert(epoch_to_update, value.active); - }); - } -} - -#[cfg(test)] -mod tests { - use acropolis_common::DelegatedStake; - - use super::*; - use crate::test_utils::*; - - #[tokio::test] - async fn new_state_returns_zeros() { - let aggregated_state = AggregatedSPOState::new(); - assert!(aggregated_state.active_stakes.is_empty()); - } - - #[test] - fn active_stakes_not_empty_after_handle_spdd() { - let aggregated_state = AggregatedSPOState::new(); - let block = new_block(2); - let mut msg = new_spdd_message(1); - msg.spos = vec![ - ( - vec![1], - DelegatedStake { - active: 1, - active_delegators_count: 1, - live: 1, - }, - ), - ( - vec![2], - DelegatedStake { - active: 2, - active_delegators_count: 2, - live: 2, - }, - ), - ]; - aggregated_state.handle_spdd(&block, &msg); - let (active_stakes, total_active_stake) = - aggregated_state.get_pools_active_stakes(&vec![vec![1], vec![2]], 3); - assert_eq!(2, active_stakes.len()); - assert_eq!(1, active_stakes[0]); - assert_eq!(2, active_stakes[1]); - assert_eq!(3, total_active_stake); - } -} diff --git a/modules/spo_state/src/epochs_history.rs b/modules/spo_state/src/epochs_history.rs index ab98a1ea..1319b099 100644 --- a/modules/spo_state/src/epochs_history.rs +++ b/modules/spo_state/src/epochs_history.rs @@ -19,19 +19,19 @@ use crate::store_config::StoreConfig; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct EpochState { /// epoch number N - epoch: u64, + pub epoch: u64, /// blocks minted during the epoch - blocks_minted: Option, + pub blocks_minted: Option, /// active stake of the epoch N (taken boundary from epoch N-2 to N-1) - active_stake: Option, + pub active_stake: Option, /// active size = active_stake / total_active_stake - active_size: Option, + pub active_size: Option, /// delegators count by the end of the epoch - delegators_count: Option, + pub delegators_count: Option, /// Total rewards pool has received during epoch - pool_reward: Option, + pub pool_reward: Option, /// pool's operator's reward - spo_reward: Option, + pub spo_reward: Option, } impl EpochState { @@ -96,6 +96,26 @@ impl EpochsHistoryState { .map(|epochs| epochs.values().map(|state| state.to_pool_epoch_state()).collect()) } + /// Get Pools Active stakes + /// Return None if any of pool operators active stake is None + pub fn get_pools_active_stakes( + &self, + pool_operators: &Vec, + epoch: u64, + ) -> Option> { + let Some(epochs_history) = self.epochs_history.as_ref() else { + return None; + }; + + let mut active_stakes = Vec::::new(); + for pool_operator in pool_operators { + let epochs = epochs_history.get(pool_operator)?; + let epoch_state = epochs.get(&epoch)?; + active_stakes.push(epoch_state.active_stake?); + } + Some(active_stakes) + } + /// Handle SPO Stake Distribution /// Update epochs_history with active_stake (for spdd_message.epoch + 2) /// diff --git a/modules/spo_state/src/historical_spo_state.rs b/modules/spo_state/src/historical_spo_state.rs index cddec481..4a559f73 100644 --- a/modules/spo_state/src/historical_spo_state.rs +++ b/modules/spo_state/src/historical_spo_state.rs @@ -1,8 +1,7 @@ -use std::collections::HashSet; - use acropolis_common::{ queries::governance::VoteRecord, KeyHash, PoolRegistration, PoolUpdateEvent, }; +use imbl::HashSet; use serde::{Deserialize, Serialize}; use crate::store_config::StoreConfig; @@ -31,17 +30,35 @@ impl HistoricalSPOState { } } + pub fn add_pool_registration(&mut self, reg: &PoolRegistration) -> Option { + // update registration if enabled + let Some(registration) = self.registration.as_mut() else { + return None; + }; + *registration = reg.clone(); + Some(true) + } + + pub fn add_pool_updates(&mut self, update: PoolUpdateEvent) -> Option { + // update updates if enabled + let Some(updates) = self.updates.as_mut() else { + return None; + }; + updates.push(update); + Some(true) + } + pub fn add_delegator(&mut self, delegator: &KeyHash) -> Option { let Some(delegators) = self.delegators.as_mut() else { return None; }; - Some(delegators.insert(delegator.clone())) + Some(delegators.insert(delegator.clone()).is_some()) } pub fn remove_delegator(&mut self, delegator: &KeyHash) -> Option { let Some(delegators) = self.delegators.as_mut() else { return None; }; - Some(delegators.remove(delegator)) + Some(delegators.remove(delegator).is_some()) } } diff --git a/modules/spo_state/src/spo_state.rs b/modules/spo_state/src/spo_state.rs index ae459dcc..b7538f43 100644 --- a/modules/spo_state/src/spo_state.rs +++ b/modules/spo_state/src/spo_state.rs @@ -8,10 +8,11 @@ use acropolis_common::{ SnapshotStateMessage, StateQuery, StateQueryResponse, }, queries::pools::{ - PoolDelegators, PoolHistory, PoolRelays, PoolsActiveStakes, PoolsList, PoolsListWithInfo, - PoolsRetiredList, PoolsRetiringList, PoolsStateQuery, PoolsStateQueryResponse, - DEFAULT_POOLS_QUERY_TOPIC, + PoolActiveStakeInfo, PoolDelegators, PoolHistory, PoolRelays, PoolUpdates, PoolVotes, + PoolsActiveStakes, PoolsList, PoolsListWithInfo, PoolsRetiredList, PoolsRetiringList, + PoolsStateQuery, PoolsStateQueryResponse, DEFAULT_POOLS_QUERY_TOPIC, }, + rational_number::RationalNumber, state_history::{StateHistory, StateHistoryStore}, BlockInfo, BlockStatus, }; @@ -22,7 +23,6 @@ use std::sync::Arc; use tokio::sync::Mutex; use tracing::{error, info, info_span, Instrument}; -mod aggregated_state; mod epochs_history; mod historical_spo_state; mod retired_pools_history; @@ -33,21 +33,38 @@ mod store_config; mod test_utils; use crate::{ - aggregated_state::AggregatedSPOState, epochs_history::EpochsHistoryState, - retired_pools_history::RetiredPoolsHistoryState, spo_state_publisher::SPOStatePublisher, + epochs_history::EpochsHistoryState, retired_pools_history::RetiredPoolsHistoryState, + spo_state_publisher::SPOStatePublisher, }; use state::State; use store_config::StoreConfig; -const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.certificates"; -const DEFAULT_WITHDRAWALS_TOPIC: &str = "cardano.withdrawals"; -const DEFAULT_STAKE_DELTAS_TOPIC: &str = "cardano.stake.deltas"; -const DEFAULT_CLOCK_TICK_TOPIC: &str = "clock.tick"; -const DEFAULT_SPO_STATE_TOPIC: &str = "cardano.spo.state"; -const DEFAULT_SPDD_SUBSCRIBE_TOPIC: &str = "cardano.spo.distribution"; -const DEFAULT_EPOCH_ACTIVITY_TOPIC: &str = "cardano.epoch.activity"; -const DEFAULT_SPO_REWARDS_TOPIC: &str = "cardano.spo.rewards"; -const DEFAULT_STAKE_REWARD_DELTAS_TOPIC: &str = "cardano.stake.reward.deltas"; +// Subscribe Topics +const DEFAULT_CERTIFICATES_SUBSCRIBE_TOPIC: (&str, &str) = + ("certificates-subscribe-topic", "cardano.certificates"); +const DEFAULT_WITHDRAWALS_SUBSCRIBE_TOPIC: (&str, &str) = + ("withdrawals-subscribe-topic", "cardano.withdrawals"); +const DEFAULT_GOVERNANCE_SUBSCRIBE_TOPIC: (&str, &str) = + ("governance-subscribe-topic", "cardano.governance"); +const DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC: (&str, &str) = + ("epoch-activity-subscribe-topic", "cardano.epoch.activity"); +const DEFAULT_SPDD_SUBSCRIBE_TOPIC: (&str, &str) = + ("spdd-subscribe-topic", "cardano.spo.distribution"); +const DEFAULT_STAKE_DELTAS_SUBSCRIBE_TOPIC: (&str, &str) = + ("stake-deltas-subscribe-topic", "cardano.stake.deltas"); +const DEFAULT_SPO_REWARDS_SUBSCRIBE_TOPIC: (&str, &str) = + ("spo-rewards-subscribe-topic", "cardano.spo.rewards"); +const DEFAULT_STAKE_REWARD_DELTAS_SUBSCRIBE_TOPIC: (&str, &str) = ( + "stake-reward-deltas-subscribe-topic", + "cardano.stake.reward.deltas", +); +const DEFAULT_CLOCK_TICK_SUBSCRIBE_TOPIC: (&str, &str) = + ("clock-tick-subscribe-topic", "clock.tick"); +const MAYBE_SNAPSHOT_SUBSCRIBE_TOPIC: &str = "snapshot-subscribe-topic"; + +// Publish Topics +const DEFAULT_SPO_STATE_PUBLISH_TOPIC: (&str, &str) = + ("publish-spo-state-topic", "cardano.spo.state"); /// SPO State module #[module( @@ -61,17 +78,19 @@ impl SPOState { /// Main async run loop async fn run( history: Arc>>, - aggregated_state: AggregatedSPOState, epochs_history: EpochsHistoryState, retired_pools_history: RetiredPoolsHistoryState, store_config: &StoreConfig, - mut certs_subscription: Box>, - mut stake_deltas_subscription: Option>>, + // subscribers + mut certificates_subscription: Box>, mut withdrawals_subscription: Option>>, - mut stake_reward_deltas_subscription: Option>>, - mut spdd_subscription: Box>, - mut spo_rewards_subscription: Box>, + mut governance_subscription: Option>>, mut epoch_activity_subscription: Box>, + mut spdd_subscription: Box>, + mut stake_deltas_subscription: Option>>, + mut spo_rewards_subscription: Option>>, + mut stake_reward_deltas_subscription: Option>>, + // publishers mut spo_state_publisher: SPOStatePublisher, ) -> Result<()> { // Get the stake address deltas from the genesis bootstrap, which we know @@ -91,9 +110,10 @@ impl SPOState { let mut current_block: Option = None; // read per-block topics in parallel - let certs_message_f = certs_subscription.read(); - let stake_deltas_message_f = stake_deltas_subscription.as_mut().map(|s| s.read()); + let certs_message_f = certificates_subscription.read(); let withdrawals_message_f = withdrawals_subscription.as_mut().map(|s| s.read()); + let governance_message_f = governance_subscription.as_mut().map(|s| s.read()); + let stake_deltas_message_f = stake_deltas_subscription.as_mut().map(|s| s.read()); // Use certs_message as the synchroniser let (_, certs_message) = certs_message_f.await?; @@ -145,7 +165,7 @@ impl SPOState { // read from epoch-boundary messages only when it's a new epoch if new_epoch { let spdd_message_f = spdd_subscription.read(); - let spo_rewards_message_f = spo_rewards_subscription.read(); + let spo_rewards_message_f = spo_rewards_subscription.as_mut().map(|s| s.read()); let ea_message_f = epoch_activity_subscription.read(); let stake_reward_deltas_message_f = stake_reward_deltas_subscription.as_mut().map(|s| s.read()); @@ -160,27 +180,27 @@ impl SPOState { let span = info_span!("spo_state.handle_spdd", block = block_info.number); span.in_scope(|| { Self::check_sync(¤t_block, &block_info); - // update aggregated state - aggregated_state.handle_spdd(block_info, spdd_message); // update epochs_history epochs_history.handle_spdd(block_info, spdd_message); }); } // Handle SPO rewards - let (_, spo_rewards_message) = spo_rewards_message_f.await?; - if let Message::Cardano(( - block_info, - CardanoMessage::SPORewards(spo_rewards_message), - )) = spo_rewards_message.as_ref() - { - let span = - info_span!("spo_state.handle_spo_rewards", block = block_info.number); - span.in_scope(|| { - Self::check_sync(¤t_block, &block_info); - // update epochs_history - epochs_history.handle_spo_rewards(block_info, spo_rewards_message); - }); + if let Some(spo_rewards_message_f) = spo_rewards_message_f { + let (_, spo_rewards_message) = spo_rewards_message_f.await?; + if let Message::Cardano(( + block_info, + CardanoMessage::SPORewards(spo_rewards_message), + )) = spo_rewards_message.as_ref() + { + let span = + info_span!("spo_state.handle_spo_rewards", block = block_info.number); + span.in_scope(|| { + Self::check_sync(¤t_block, &block_info); + // update epochs_history + epochs_history.handle_spo_rewards(block_info, spo_rewards_message); + }); + } } // Handle Stake Reward Deltas @@ -280,6 +300,29 @@ impl SPOState { } } + // Handle governance + if let Some(governance_message_f) = governance_message_f { + let (_, message) = governance_message_f.await?; + match message.as_ref() { + Message::Cardano(( + block_info, + CardanoMessage::GovernanceProcedures(governance_msg), + )) => { + let span = + info_span!("spo_state.handle_governance", block = block_info.number); + span.in_scope(|| { + Self::check_sync(¤t_block, &block_info); + state + .handle_governance(&governance_msg.voting_procedures) + .inspect_err(|e| error!("Governance handling error: {e:#}")) + .ok(); + }); + } + + _ => error!("Unexpected message type: {message:?}"), + } + } + // Commit the new state if let Some(block_info) = current_block { history.lock().await.commit(block_info.number, state); @@ -311,50 +354,61 @@ impl SPOState { pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { // Get configuration - let subscribe_topic = - config.get_string("subscribe-topic").unwrap_or(DEFAULT_SUBSCRIBE_TOPIC.to_string()); - info!("Creating subscriber on '{subscribe_topic}'"); - - let withdrawals_topic = - config.get_string("withdrawals-topic").unwrap_or(DEFAULT_WITHDRAWALS_TOPIC.to_string()); - info!("Creating withdrawals subscriber on '{withdrawals_topic}'"); - - let stake_deltas_topic = config - .get_string("stake-deltas-topic") - .unwrap_or(DEFAULT_STAKE_DELTAS_TOPIC.to_string()); - info!("Creating stake deltas subscriber on '{stake_deltas_topic}'"); - - let clock_tick_topic = - config.get_string("clock-tick-topic").unwrap_or(DEFAULT_CLOCK_TICK_TOPIC.to_string()); - info!("Creating subscriber on '{clock_tick_topic}'"); - - let spdd_topic = - config.get_string("spdd-topic").unwrap_or(DEFAULT_SPDD_SUBSCRIBE_TOPIC.to_string()); - info!("Creating subscriber on '{spdd_topic}'"); - - let epoch_activity_topic = config - .get_string("epoch-activity-topic") - .unwrap_or(DEFAULT_EPOCH_ACTIVITY_TOPIC.to_string()); - info!("Creating subscriber on '{epoch_activity_topic}'"); - - let spo_rewards_topic = - config.get_string("spo-rewards-topic").unwrap_or(DEFAULT_SPO_REWARDS_TOPIC.to_string()); - info!("Creating SPO rewards publisher on '{spo_rewards_topic}'"); - - let stake_reward_deltas_topic = config - .get_string("stake-reward-deltas-topic") - .unwrap_or(DEFAULT_STAKE_REWARD_DELTAS_TOPIC.to_string()); - info!("Creating stake reward deltas subscriber on '{stake_reward_deltas_topic}'"); + let certificates_subscribe_topic = config + .get_string(DEFAULT_CERTIFICATES_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_CERTIFICATES_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating subscriber on '{certificates_subscribe_topic}'"); + + let withdrawals_subscribe_topic = config + .get_string(DEFAULT_WITHDRAWALS_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_WITHDRAWALS_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating withdrawals subscriber on '{withdrawals_subscribe_topic}'"); + + let governance_subscribe_topic = config + .get_string(DEFAULT_GOVERNANCE_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_GOVERNANCE_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating governance subscriber on '{governance_subscribe_topic}'"); + + let stake_deltas_subscribe_topic = config + .get_string(DEFAULT_STAKE_DELTAS_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_STAKE_DELTAS_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating stake deltas subscriber on '{stake_deltas_subscribe_topic}'"); + + let epoch_activity_subscribe_topic = config + .get_string(DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating subscriber on '{epoch_activity_subscribe_topic}'"); + + let spdd_subscribe_topic = config + .get_string(DEFAULT_SPDD_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_SPDD_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating subscriber on '{spdd_subscribe_topic}'"); + + let spo_rewards_subscribe_topic = config + .get_string(DEFAULT_SPO_REWARDS_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_SPO_REWARDS_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating SPO rewards subscriber on '{spo_rewards_subscribe_topic}'"); + + let stake_reward_deltas_subscribe_topic = config + .get_string(DEFAULT_STAKE_REWARD_DELTAS_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_STAKE_REWARD_DELTAS_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating stake reward deltas subscriber on '{stake_reward_deltas_subscribe_topic}'"); + + let clock_tick_subscribe_topic = config + .get_string(DEFAULT_CLOCK_TICK_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_CLOCK_TICK_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating subscriber on '{clock_tick_subscribe_topic}'"); let maybe_snapshot_topic = config - .get_string("snapshot-topic") + .get_string(MAYBE_SNAPSHOT_SUBSCRIBE_TOPIC) .ok() .inspect(|snapshot_topic| info!("Creating subscriber on '{snapshot_topic}'")); - let spo_state_topic = config - .get_string("publish-spo-state-topic") - .unwrap_or(DEFAULT_SPO_STATE_TOPIC.to_string()); - info!("Creating SPO state publisher on '{spo_state_topic}'"); + // Publish Topics + let spo_state_publish_topic = config + .get_string(DEFAULT_SPO_STATE_PUBLISH_TOPIC.0) + .unwrap_or(DEFAULT_SPO_STATE_PUBLISH_TOPIC.1.to_string()); + info!("Creating SPO state publisher on '{spo_state_publish_topic}'"); // query topic let pools_query_topic = config @@ -374,10 +428,6 @@ impl SPOState { let history_tick = history.clone(); let history_snapshot = history.clone(); - // Create Aggregated State - let aggregated_state = AggregatedSPOState::new(); - let aggregated_state_spo_state = aggregated_state.clone(); - // Create epochs history let epochs_history = EpochsHistoryState::new(store_config.clone()); let epochs_history_spo_state = epochs_history.clone(); @@ -389,7 +439,6 @@ impl SPOState { // handle pools-state query context.handle(&pools_query_topic, move |message| { let history = history_spo_state.clone(); - let aggregated_state = aggregated_state_spo_state.clone(); let epochs_history = epochs_history_spo_state.clone(); let retired_pools_history = retired_pools_history_spo_state.clone(); @@ -415,28 +464,38 @@ impl SPOState { }; PoolsStateQueryResponse::PoolsListWithInfo(pools_list_with_info) } + PoolsStateQuery::GetPoolInfo { pool_id } => { + let pool_info = state.get(pool_id); + if let Some(pool_info) = pool_info { + PoolsStateQueryResponse::PoolInfo(pool_info.clone()) + } else { + PoolsStateQueryResponse::NotFound + } + } + + PoolsStateQuery::GetPoolActiveStakeInfo { pool_operator, epoch } => { + if epochs_history.is_enabled() { + let epoch_state = epochs_history.get_epoch_state(pool_operator, *epoch); + PoolsStateQueryResponse::PoolActiveStakeInfo(PoolActiveStakeInfo { + active_stake: epoch_state.as_ref().and_then(|state| state.active_stake).unwrap_or(0), + active_size: epoch_state.as_ref().and_then(|state| state.active_size).unwrap_or(RationalNumber::from(0)), + }) + } else { + PoolsStateQueryResponse::Error("Epochs history is not enabled".into()) + } + }, PoolsStateQuery::GetPoolsActiveStakes { pools_operators, epoch, } => { - let (active_stakes, total_active_stake) = - aggregated_state.get_pools_active_stakes(pools_operators, *epoch); - PoolsStateQueryResponse::PoolsActiveStakes(PoolsActiveStakes { - active_stakes, - total_active_stake, - }) - } - - PoolsStateQuery::GetPoolHistory { pool_id } => { if epochs_history.is_enabled() { - let history = - epochs_history.get_pool_history(pool_id).unwrap_or(Vec::new()); - PoolsStateQueryResponse::PoolHistory(PoolHistory { history }) + let active_stakes = epochs_history.get_pools_active_stakes(pools_operators, *epoch); + PoolsStateQueryResponse::PoolsActiveStakes(PoolsActiveStakes { + active_stakes: active_stakes.unwrap_or(vec![0; pools_operators.len()]), + }) } else { - PoolsStateQueryResponse::Error( - "Pool Epoch history is not enabled".into(), - ) + PoolsStateQueryResponse::Error("Epochs history is not enabled".into()) } } @@ -460,6 +519,18 @@ impl SPOState { } } + PoolsStateQuery::GetPoolHistory { pool_id } => { + if epochs_history.is_enabled() { + let history = + epochs_history.get_pool_history(pool_id).unwrap_or(Vec::new()); + PoolsStateQueryResponse::PoolHistory(PoolHistory { history }) + } else { + PoolsStateQueryResponse::Error( + "Pool Epoch history is not enabled".into(), + ) + } + } + PoolsStateQuery::GetPoolMetadata { pool_id } => { // NOTE: // we need to check retired pools metadata @@ -473,6 +544,15 @@ impl SPOState { } } + PoolsStateQuery::GetPoolRelays { pool_id } => { + let pool_relays = state.get_pool_relays(pool_id); + if let Some(relays) = pool_relays { + PoolsStateQueryResponse::PoolRelays(PoolRelays { relays }) + } else { + PoolsStateQueryResponse::NotFound + } + } + PoolsStateQuery::GetPoolDelegators { pool_id } => { if state.is_historical_delegators_enabled() && state.is_stake_address_enabled() { let pool_delegators = state.get_pool_delegators(pool_id); @@ -488,19 +568,35 @@ impl SPOState { } } - PoolsStateQuery::GetPoolRelays { pool_id } => { - let pool_relays = state.get_pool_relays(pool_id); - if let Some(relays) = pool_relays { - PoolsStateQueryResponse::PoolRelays(PoolRelays { relays }) + PoolsStateQuery::GetPoolUpdates { pool_id } => { + if state.is_historical_updates_enabled() { + let pool_updates = state.get_pool_updates(pool_id); + if let Some(pool_updates) = pool_updates { + PoolsStateQueryResponse::PoolUpdates(PoolUpdates { + updates: pool_updates, + }) + } else { + PoolsStateQueryResponse::NotFound + } } else { - PoolsStateQueryResponse::NotFound + PoolsStateQueryResponse::Error("Pool updates are not enabled".into()) } } - _ => PoolsStateQueryResponse::Error(format!( - "Unimplemented query variant: {:?}", - query - )), + PoolsStateQuery::GetPoolVotes { pool_id } => { + if state.is_historical_votes_enabled() { + let pool_votes = state.get_pool_votes(pool_id); + if let Some(pool_votes) = pool_votes { + PoolsStateQueryResponse::PoolVotes(PoolVotes { + votes: pool_votes, + }) + } else { + PoolsStateQueryResponse::NotFound + } + } else { + PoolsStateQueryResponse::Error("Pool updates are not enabled".into()) + } + } }; Arc::new(Message::StateQueryResponse(StateQueryResponse::Pools( @@ -553,46 +649,59 @@ impl SPOState { }); } - // Publishers - let spo_state_publisher = SPOStatePublisher::new(context.clone(), spo_state_topic); - // Subscriptions - let certs_subscription = context.subscribe(&subscribe_topic).await?; - let stake_deltas_subscription = if store_config.store_stake_addresses { - Some(context.subscribe(&stake_deltas_topic).await?) + let certificates_subscription = context.subscribe(&certificates_subscribe_topic).await?; + // only when stake_addresses are enabled + let withdrawals_subscription = if store_config.store_stake_addresses { + Some(context.subscribe(&withdrawals_subscribe_topic).await?) } else { None }; - let withdrawals_subscription = if store_config.store_stake_addresses { - Some(context.subscribe(&withdrawals_topic).await?) + // when historical spo's votes are enabled + let governance_subscription = if store_config.store_votes { + Some(context.subscribe(&governance_subscribe_topic).await?) + } else { + None + }; + let epoch_activity_subscription = + context.subscribe(&epoch_activity_subscribe_topic).await?; + let spdd_subscription = context.subscribe(&spdd_subscribe_topic).await?; + // when epochs_history is enabled + let spo_rewards_subscription = if store_config.store_epochs_history { + Some(context.subscribe(&spo_rewards_subscribe_topic).await?) + } else { + None + }; + // when state_addresses are enabled + let stake_deltas_subscription = if store_config.store_stake_addresses { + Some(context.subscribe(&stake_deltas_subscribe_topic).await?) } else { None }; let stake_reward_deltas_subscription = if store_config.store_stake_addresses { - Some(context.subscribe(&stake_reward_deltas_topic).await?) + Some(context.subscribe(&stake_reward_deltas_subscribe_topic).await?) } else { None }; + let clock_tick_subscription = context.subscribe(&clock_tick_subscribe_topic).await?; - let spdd_subscription = context.subscribe(&spdd_topic).await?; - let spo_rewards_subscription = context.subscribe(&spo_rewards_topic).await?; - let epoch_activity_subscription = context.subscribe(&epoch_activity_topic).await?; - let clock_tick_subscription = context.subscribe(&clock_tick_topic).await?; + // Publishers + let spo_state_publisher = SPOStatePublisher::new(context.clone(), spo_state_publish_topic); context.run(async move { Self::run( history, - aggregated_state, epochs_history, retired_pools_history, &store_config, - certs_subscription, - stake_deltas_subscription, + certificates_subscription, withdrawals_subscription, - stake_reward_deltas_subscription, + governance_subscription, + epoch_activity_subscription, spdd_subscription, + stake_deltas_subscription, spo_rewards_subscription, - epoch_activity_subscription, + stake_reward_deltas_subscription, spo_state_publisher, ) .await diff --git a/modules/spo_state/src/state.rs b/modules/spo_state/src/state.rs index 3e41c273..09c9f6c9 100644 --- a/modules/spo_state/src/state.rs +++ b/modules/spo_state/src/state.rs @@ -7,9 +7,11 @@ use acropolis_common::{ StakeRewardDeltasMessage, TxCertificatesMessage, WithdrawalsMessage, }, params::TECHNICAL_PARAMETER_POOL_RETIRE_MAX_EPOCH, + queries::governance::VoteRecord, stake_addresses::StakeAddressMap, - BlockInfo, KeyHash, PoolMetadata, PoolRegistration, PoolRetirement, Relay, StakeCredential, - TxCertificate, + BlockInfo, KeyHash, PoolMetadata, PoolRegistration, PoolRegistrationWithPos, PoolRetirement, + PoolRetirementWithPos, PoolUpdateEvent, Relay, StakeCredential, TxCertificate, TxHash, Voter, + VotingProcedures, }; use anyhow::Result; use imbl::HashMap; @@ -72,6 +74,14 @@ impl State { self.store_config.store_delegators } + pub fn is_historical_updates_enabled(&self) -> bool { + self.store_config.store_updates + } + + pub fn is_historical_votes_enabled(&self) -> bool { + self.store_config.store_votes + } + pub fn is_stake_address_enabled(&self) -> bool { self.store_config.store_stake_addresses } @@ -182,6 +192,28 @@ impl State { Some(delegators_with_live_stakes) } + /// Get Pool Updates + pub fn get_pool_updates(&self, pool_id: &KeyHash) -> Option> { + let Some(historical_spos) = self.historical_spos.as_ref() else { + return None; + }; + + let updates: Option> = + historical_spos.get(pool_id).map(|s| s.updates.clone()).flatten(); + updates + } + + /// Get Pool Votes + pub fn get_pool_votes(&self, pool_id: &KeyHash) -> Option> { + let Some(historical_spos) = self.historical_spos.as_ref() else { + return None; + }; + + let votes: Option> = + historical_spos.get(pool_id).map(|s| s.votes.clone()).flatten(); + votes + } + /// Get pool relay pub fn get_pool_relays(&self, pool_id: &KeyHash) -> Option> { self.spos.get(pool_id).map(|p| p.relays.clone()) @@ -252,7 +284,16 @@ impl State { ))) } - fn handle_pool_registration(&mut self, block: &BlockInfo, reg: &PoolRegistration) { + fn handle_pool_registration( + &mut self, + block: &BlockInfo, + reg_with_pos: &PoolRegistrationWithPos, + ) { + let PoolRegistrationWithPos { + reg, + tx_hash, + cert_index, + } = reg_with_pos; debug!( block = block.number, "Registering SPO {}", @@ -273,9 +314,28 @@ impl State { ); } } + + // update historical spos + if let Some(historical_spos) = self.historical_spos.as_mut() { + // Don't check there was registration already or not + // because we don't remove registration when pool is retired. + let historical_spo = historical_spos + .entry(reg.operator.clone()) + .or_insert_with(|| HistoricalSPOState::new(&self.store_config)); + historical_spo.add_pool_registration(reg); + historical_spo.add_pool_updates(PoolUpdateEvent::register_event( + tx_hash.clone(), + *cert_index, + )); + } } - fn handle_pool_retirement(&mut self, block: &BlockInfo, ret: &PoolRetirement) { + fn handle_pool_retirement(&mut self, block: &BlockInfo, ret_with_pos: &PoolRetirementWithPos) { + let PoolRetirementWithPos { + ret, + tx_hash, + cert_index, + } = ret_with_pos; debug!( "SPO {} wants to retire at the end of epoch {} (cert in block number {})", hex::encode(&ret.operator), @@ -309,6 +369,19 @@ impl State { } self.pending_deregistrations.entry(ret.epoch).or_default().push(ret.operator.clone()); } + + // update historical spos + if let Some(historical_spos) = self.historical_spos.as_mut() { + if let Some(historical_spo) = historical_spos.get_mut(&ret.operator) { + historical_spo + .add_pool_updates(PoolUpdateEvent::retire_event(tx_hash.clone(), *cert_index)); + } else { + error!( + "Historical SPO for {} not registered when try to retire it", + hex::encode(&ret.operator) + ); + } + } } fn register_stake_address(&mut self, credential: &StakeCredential) { @@ -414,11 +487,11 @@ impl State { for tx_cert in tx_certs_msg.certificates.iter() { match tx_cert { // for spo_state - TxCertificate::PoolRegistration(reg) => { - self.handle_pool_registration(block, reg); + TxCertificate::PoolRegistrationWithPos(reg_with_pos) => { + self.handle_pool_registration(block, ®_with_pos); } - TxCertificate::PoolRetirement(ret) => { - self.handle_pool_retirement(block, ret); + TxCertificate::PoolRetirementWithPos(ret_with_pos) => { + self.handle_pool_retirement(block, &ret_with_pos); } // for stake addresses @@ -475,6 +548,40 @@ impl State { Ok(()) } + pub fn handle_governance( + &mut self, + voting_procedures: &[(TxHash, VotingProcedures)], + ) -> Result<()> { + // when we save historical spo's vote + let Some(historical_spos) = self.historical_spos.as_mut() else { + return Ok(()); + }; + + for (tx_hash, voting_procedures) in voting_procedures { + for (voter, single_votes) in &voting_procedures.votes { + let spo = match voter { + Voter::StakePoolKey(spo) => spo, + _ => continue, + }; + + let historical_spo = historical_spos + .entry(spo.clone()) + .or_insert_with(|| HistoricalSPOState::new(&self.store_config)); + + if let Some(votes) = historical_spo.votes.as_mut() { + for (_, vp) in &single_votes.voting_procedures { + votes.push(VoteRecord { + tx_hash: tx_hash.clone(), + vote_index: vp.vote_index, + vote: vp.vote.clone(), + }); + } + } + } + } + Ok(()) + } + /// Handle stake deltas pub fn handle_stake_deltas(&mut self, deltas_msg: &StakeAddressDeltasMessage) -> Result<()> { let Some(stake_addresses) = self.stake_addresses.as_ref() else { @@ -521,7 +628,7 @@ mod tests { use crate::test_utils::*; use acropolis_common::{ state_history::{StateHistory, StateHistoryStore}, - PoolRetirement, Ratio, TxCertificate, + PoolRetirement, Ratio, TxCertificate, TxHash, }; use tokio::sync::Mutex; @@ -556,20 +663,26 @@ mod tests { async fn spo_gets_registered() { let mut state = State::default(); let mut msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRegistration(PoolRegistration { - operator: vec![0], - vrf_key_hash: vec![0], - pledge: 0, - cost: 0, - margin: Ratio { - numerator: 0, - denominator: 0, + msg.certificates.push(TxCertificate::PoolRegistrationWithPos( + PoolRegistrationWithPos { + reg: PoolRegistration { + operator: vec![0], + vrf_key_hash: vec![0], + pledge: 0, + cost: 0, + margin: Ratio { + numerator: 0, + denominator: 0, + }, + reward_account: vec![0], + pool_owners: vec![vec![0]], + relays: vec![], + pool_metadata: None, + }, + tx_hash: TxHash::default(), + cert_index: 1, }, - reward_account: vec![0], - pool_owners: vec![vec![0]], - relays: vec![], - pool_metadata: None, - })); + )); let block = new_block(1); assert!(state.handle_tx_certs(&block, &msg).is_ok()); assert_eq!(1, state.spos.len()); @@ -581,10 +694,16 @@ mod tests { async fn pending_deregistration_gets_queued() { let mut state = State::default(); let mut msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirement(PoolRetirement { - operator: vec![0], - epoch: 1, - })); + msg.certificates.push(TxCertificate::PoolRetirementWithPos( + PoolRetirementWithPos { + ret: PoolRetirement { + operator: vec![0], + epoch: 1, + }, + tx_hash: TxHash::default(), + cert_index: 0, + }, + )); let block = new_block(0); assert!(state.handle_tx_certs(&block, &msg).is_ok()); assert_eq!(1, state.pending_deregistrations.len()); @@ -601,18 +720,30 @@ mod tests { let mut state = State::default(); let mut block = new_block(0); let mut msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirement(PoolRetirement { - operator: vec![0], - epoch: 2, - })); + msg.certificates.push(TxCertificate::PoolRetirementWithPos( + PoolRetirementWithPos { + ret: PoolRetirement { + operator: vec![0], + epoch: 2, + }, + tx_hash: TxHash::default(), + cert_index: 0, + }, + )); assert!(state.handle_tx_certs(&block, &msg).is_ok()); block.number = 1; msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirement(PoolRetirement { - operator: vec![1], - epoch: 2, - })); + msg.certificates.push(TxCertificate::PoolRetirementWithPos( + PoolRetirementWithPos { + ret: PoolRetirement { + operator: vec![1], + epoch: 2, + }, + tx_hash: TxHash::default(), + cert_index: 0, + }, + )); assert!(state.handle_tx_certs(&block, &msg).is_ok()); assert_eq!(1, state.pending_deregistrations.len()); @@ -634,20 +765,32 @@ mod tests { let mut state = history.lock().await.get_current_state(); let mut block = new_block(0); let mut msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirement(PoolRetirement { - operator: vec![0], - epoch: 2, - })); + msg.certificates.push(TxCertificate::PoolRetirementWithPos( + PoolRetirementWithPos { + ret: PoolRetirement { + operator: vec![0], + epoch: 2, + }, + tx_hash: TxHash::default(), + cert_index: 0, + }, + )); assert!(state.handle_tx_certs(&block, &msg).is_ok()); history.lock().await.commit(block.number, state); let mut state = history.lock().await.get_current_state(); block.number = 1; msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirement(PoolRetirement { - operator: vec![1], - epoch: 2, - })); + msg.certificates.push(TxCertificate::PoolRetirementWithPos( + PoolRetirementWithPos { + ret: PoolRetirement { + operator: vec![1], + epoch: 2, + }, + tx_hash: TxHash::default(), + cert_index: 0, + }, + )); assert!(state.handle_tx_certs(&block, &msg).is_ok()); history.lock().await.commit(block.number, state); @@ -669,20 +812,26 @@ mod tests { let mut state = State::default(); let mut block = new_block(0); let mut msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRegistration(PoolRegistration { - operator: vec![0], - vrf_key_hash: vec![0], - pledge: 0, - cost: 0, - margin: Ratio { - numerator: 0, - denominator: 0, + msg.certificates.push(TxCertificate::PoolRegistrationWithPos( + PoolRegistrationWithPos { + reg: PoolRegistration { + operator: vec![0], + vrf_key_hash: vec![0], + pledge: 0, + cost: 0, + margin: Ratio { + numerator: 0, + denominator: 0, + }, + reward_account: vec![0], + pool_owners: vec![vec![0]], + relays: vec![], + pool_metadata: None, + }, + tx_hash: TxHash::default(), + cert_index: 0, }, - reward_account: vec![0], - pool_owners: vec![vec![0]], - relays: vec![], - pool_metadata: None, - })); + )); assert!(state.handle_tx_certs(&block, &msg).is_ok()); assert_eq!(1, state.spos.len()); @@ -691,10 +840,16 @@ mod tests { block.number = 1; let mut msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirement(PoolRetirement { - operator: vec![0], - epoch: 1, - })); + msg.certificates.push(TxCertificate::PoolRetirementWithPos( + PoolRetirementWithPos { + ret: PoolRetirement { + operator: vec![0], + epoch: 1, + }, + tx_hash: TxHash::default(), + cert_index: 0, + }, + )); assert!(state.handle_tx_certs(&block, &msg).is_ok()); block.epoch = 1; // SPO get retired at the start of the epoch it requests @@ -713,20 +868,26 @@ mod tests { let mut state = history.lock().await.get_current_state(); let mut block = new_block(0); let mut msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRegistration(PoolRegistration { - operator: vec![0], - vrf_key_hash: vec![0], - pledge: 0, - cost: 0, - margin: Ratio { - numerator: 0, - denominator: 0, + msg.certificates.push(TxCertificate::PoolRegistrationWithPos( + PoolRegistrationWithPos { + reg: PoolRegistration { + operator: vec![0], + vrf_key_hash: vec![0], + pledge: 0, + cost: 0, + margin: Ratio { + numerator: 0, + denominator: 0, + }, + reward_account: vec![0], + pool_owners: vec![vec![0]], + relays: vec![], + pool_metadata: None, + }, + tx_hash: TxHash::default(), + cert_index: 0, }, - reward_account: vec![0], - pool_owners: vec![vec![0]], - relays: vec![], - pool_metadata: None, - })); + )); assert!(state.handle_tx_certs(&block, &msg).is_ok()); assert_eq!(1, state.spos.len()); let spo = state.spos.get(&vec![0u8]); @@ -736,10 +897,16 @@ mod tests { let mut state = history.lock().await.get_current_state(); block.number = 1; msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirement(PoolRetirement { - operator: vec![0], - epoch: 1, - })); + msg.certificates.push(TxCertificate::PoolRetirementWithPos( + PoolRetirementWithPos { + ret: PoolRetirement { + operator: vec![0], + epoch: 1, + }, + tx_hash: TxHash::default(), + cert_index: 0, + }, + )); assert!(state.handle_tx_certs(&block, &msg).is_ok()); history.lock().await.commit(block.number, state); @@ -772,18 +939,30 @@ mod tests { let mut state = State::default(); let mut block = new_block(0); let mut msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirement(PoolRetirement { - operator: vec![0], - epoch: 2, - })); + msg.certificates.push(TxCertificate::PoolRetirementWithPos( + PoolRetirementWithPos { + ret: PoolRetirement { + operator: vec![0], + epoch: 2, + }, + tx_hash: TxHash::default(), + cert_index: 0, + }, + )); assert!(state.handle_tx_certs(&block, &msg).is_ok()); block.number = 1; msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirement(PoolRetirement { - operator: vec![1], - epoch: 3, - })); + msg.certificates.push(TxCertificate::PoolRetirementWithPos( + PoolRetirementWithPos { + ret: PoolRetirement { + operator: vec![1], + epoch: 3, + }, + tx_hash: TxHash::default(), + cert_index: 0, + }, + )); assert!(state.handle_tx_certs(&block, &msg).is_ok()); let mut retiring_pools = state.get_retiring_pools(); retiring_pools.sort_by_key(|p| p.epoch); diff --git a/modules/spo_state/src/test_utils.rs b/modules/spo_state/src/test_utils.rs index 055c4e27..fe9891b3 100644 --- a/modules/spo_state/src/test_utils.rs +++ b/modules/spo_state/src/test_utils.rs @@ -2,7 +2,7 @@ use acropolis_common::{ messages::{ EpochActivityMessage, SPORewardsMessage, SPOStakeDistributionMessage, TxCertificatesMessage, }, - BlockInfo, BlockStatus, Era, TxCertificate, + BlockHash, BlockInfo, BlockStatus, Era, TxCertificate, }; use crate::store_config::StoreConfig; @@ -48,7 +48,7 @@ pub fn new_block(epoch: u64) -> BlockInfo { status: BlockStatus::Immutable, slot: 0, number: 10 * epoch, - hash: Vec::::new(), + hash: BlockHash::default(), epoch, epoch_slot: 0, new_epoch: true, diff --git a/modules/stake_delta_filter/src/utils.rs b/modules/stake_delta_filter/src/utils.rs index 3bbbbdfd..5bb471f9 100644 --- a/modules/stake_delta_filter/src/utils.rs +++ b/modules/stake_delta_filter/src/utils.rs @@ -400,7 +400,7 @@ pub fn process_message( mod test { use crate::*; use acropolis_common::{ - messages::AddressDeltasMessage, Address, AddressDelta, BlockInfo, BlockStatus, + messages::AddressDeltasMessage, Address, AddressDelta, BlockHash, BlockInfo, BlockStatus, ByronAddress, Era, ShelleyAddress, ShelleyAddressDelegationPart, ShelleyAddressPaymentPart, ShelleyAddressPointer, StakeAddress, StakeAddressPayload, ValueDelta, }; @@ -539,7 +539,7 @@ mod test { status: BlockStatus::Immutable, slot: 2498243, number: 1, - hash: vec![], + hash: BlockHash::default(), epoch: 1, epoch_slot: 14243, new_epoch: true, diff --git a/modules/tx_unpacker/src/map_parameters.rs b/modules/tx_unpacker/src/map_parameters.rs index f631cc41..0c331075 100644 --- a/modules/tx_unpacker/src/map_parameters.rs +++ b/modules/tx_unpacker/src/map_parameters.rs @@ -234,32 +234,42 @@ pub fn map_certificate( pool_owners, relays, pool_metadata, - } => Ok(TxCertificate::PoolRegistration(PoolRegistration { - operator: operator.to_vec(), - vrf_key_hash: vrf_keyhash.to_vec(), - pledge: *pledge, - cost: *cost, - margin: Ratio { - numerator: margin.numerator, - denominator: margin.denominator, - }, - reward_account: reward_account.to_vec(), - pool_owners: pool_owners.into_iter().map(|v| v.to_vec()).collect(), - relays: relays.into_iter().map(|relay| map_relay(relay)).collect(), - pool_metadata: match pool_metadata { - Nullable::Some(md) => Some(PoolMetadata { - url: md.url.clone(), - hash: md.hash.to_vec(), - }), - _ => None, + } => Ok(TxCertificate::PoolRegistrationWithPos( + PoolRegistrationWithPos { + reg: PoolRegistration { + operator: operator.to_vec(), + vrf_key_hash: vrf_keyhash.to_vec(), + pledge: *pledge, + cost: *cost, + margin: Ratio { + numerator: margin.numerator, + denominator: margin.denominator, + }, + reward_account: reward_account.to_vec(), + pool_owners: pool_owners.into_iter().map(|v| v.to_vec()).collect(), + relays: relays.into_iter().map(|relay| map_relay(relay)).collect(), + pool_metadata: match pool_metadata { + Nullable::Some(md) => Some(PoolMetadata { + url: md.url.clone(), + hash: md.hash.to_vec(), + }), + _ => None, + }, + }, + tx_hash, + cert_index: cert_index as u64, }, - })), - alonzo::Certificate::PoolRetirement(pool_key_hash, epoch) => { - Ok(TxCertificate::PoolRetirement(PoolRetirement { - operator: pool_key_hash.to_vec(), - epoch: *epoch, - })) - } + )), + alonzo::Certificate::PoolRetirement(pool_key_hash, epoch) => Ok( + TxCertificate::PoolRetirementWithPos(PoolRetirementWithPos { + ret: PoolRetirement { + operator: pool_key_hash.to_vec(), + epoch: *epoch, + }, + tx_hash, + cert_index: cert_index as u64, + }), + ), alonzo::Certificate::GenesisKeyDelegation( genesis_hash, genesis_delegate_hash, @@ -326,32 +336,42 @@ pub fn map_certificate( pool_owners, relays, pool_metadata, - } => Ok(TxCertificate::PoolRegistration(PoolRegistration { - operator: operator.to_vec(), - vrf_key_hash: vrf_keyhash.to_vec(), - pledge: *pledge, - cost: *cost, - margin: Ratio { - numerator: margin.numerator, - denominator: margin.denominator, - }, - reward_account: reward_account.to_vec(), - pool_owners: pool_owners.into_iter().map(|v| v.to_vec()).collect(), - relays: relays.into_iter().map(|relay| map_relay(relay)).collect(), - pool_metadata: match pool_metadata { - Nullable::Some(md) => Some(PoolMetadata { - url: md.url.clone(), - hash: md.hash.to_vec(), - }), - _ => None, + } => Ok(TxCertificate::PoolRegistrationWithPos( + PoolRegistrationWithPos { + reg: PoolRegistration { + operator: operator.to_vec(), + vrf_key_hash: vrf_keyhash.to_vec(), + pledge: *pledge, + cost: *cost, + margin: Ratio { + numerator: margin.numerator, + denominator: margin.denominator, + }, + reward_account: reward_account.to_vec(), + pool_owners: pool_owners.into_iter().map(|v| v.to_vec()).collect(), + relays: relays.into_iter().map(|relay| map_relay(relay)).collect(), + pool_metadata: match pool_metadata { + Nullable::Some(md) => Some(PoolMetadata { + url: md.url.clone(), + hash: md.hash.to_vec(), + }), + _ => None, + }, + }, + tx_hash, + cert_index: cert_index as u64, }, - })), - conway::Certificate::PoolRetirement(pool_key_hash, epoch) => { - Ok(TxCertificate::PoolRetirement(PoolRetirement { - operator: pool_key_hash.to_vec(), - epoch: *epoch, - })) - } + )), + conway::Certificate::PoolRetirement(pool_key_hash, epoch) => Ok( + TxCertificate::PoolRetirementWithPos(PoolRetirementWithPos { + ret: PoolRetirement { + operator: pool_key_hash.to_vec(), + epoch: *epoch, + }, + tx_hash, + cert_index: cert_index as u64, + }), + ), conway::Certificate::Reg(cred, coin) => { Ok(TxCertificate::Registration(Registration { diff --git a/modules/upstream_chain_fetcher/src/body_fetcher.rs b/modules/upstream_chain_fetcher/src/body_fetcher.rs index f128c94e..84c30a37 100644 --- a/modules/upstream_chain_fetcher/src/body_fetcher.rs +++ b/modules/upstream_chain_fetcher/src/body_fetcher.rs @@ -65,7 +65,7 @@ impl BodyFetcher { let header = MultiEraHeader::decode(h.variant, tag, &h.cbor)?; let slot = header.slot(); let number = header.number(); - let hash = header.hash().to_vec(); + let hash = *header.hash(); let (epoch, epoch_slot) = self.cfg.slot_to_epoch(slot); let new_epoch = match self.last_epoch { @@ -110,7 +110,7 @@ impl BodyFetcher { // Fetch and publish the block itself - note we need to // reconstruct a Point from the header because the one we get // in the RollForward is the *tip*, not the next read point - let fetch_point = Point::Specific(slot, hash.clone()); + let fetch_point = Point::Specific(slot, hash.to_vec()); let msg_body = self.fetch_block(fetch_point).await?; // Construct message diff --git a/modules/upstream_chain_fetcher/src/upstream_cache.rs b/modules/upstream_chain_fetcher/src/upstream_cache.rs index 5a3332c4..9281a358 100644 --- a/modules/upstream_chain_fetcher/src/upstream_cache.rs +++ b/modules/upstream_chain_fetcher/src/upstream_cache.rs @@ -172,7 +172,7 @@ mod test { use crate::upstream_cache::{Storage, UpstreamCacheImpl, UpstreamCacheRecord}; use acropolis_common::{ messages::{BlockBodyMessage, BlockHeaderMessage}, - BlockInfo, BlockStatus, Era, + BlockHash, BlockInfo, BlockStatus, Era, }; use anyhow::Result; use std::{collections::HashMap, sync::Arc}; @@ -182,7 +182,7 @@ mod test { status: BlockStatus::Volatile, slot: n, number: n, - hash: vec![], + hash: BlockHash::default(), epoch: 0, epoch_slot: n, new_epoch: false, diff --git a/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs b/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs index a6203ba5..80b9e368 100644 --- a/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs +++ b/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs @@ -190,7 +190,7 @@ impl UpstreamChainFetcher { let mut upstream_cache = UpstreamCache::new(&cfg.cache_dir); let point = match Self::read_cache(cfg.clone(), &mut upstream_cache).await? { None => Point::Origin, - Some(blk) => Point::Specific(blk.slot, blk.hash), + Some(blk) => Point::Specific(blk.slot, blk.hash.to_vec()), }; Self::sync_to_point(cfg, peer.clone(), Some(upstream_cache), point).await?; @@ -210,7 +210,7 @@ impl UpstreamChainFetcher { "Notified snapshot complete at slot {} block number {}", block.slot, block.number ); - let point = Point::Specific(block.slot, block.hash.clone()); + let point = Point::Specific(block.slot, block.hash.to_vec()); Self::sync_to_point(cfg, peer, None, point).await?; } None => info!("Completion not received. Exiting ..."), diff --git a/modules/utxo_state/src/state.rs b/modules/utxo_state/src/state.rs index a09a2f83..48e0791f 100644 --- a/modules/utxo_state/src/state.rs +++ b/modules/utxo_state/src/state.rs @@ -409,7 +409,7 @@ impl State { mod tests { use super::*; use crate::InMemoryImmutableUTXOStore; - use acropolis_common::{ByronAddress, Era, NativeAsset, Value}; + use acropolis_common::{BlockHash, ByronAddress, Era, NativeAsset, Value}; use config::Config; use tokio::sync::Mutex; @@ -425,7 +425,7 @@ mod tests { status, slot, number, - hash: vec![], + hash: BlockHash::default(), epoch: 99, epoch_slot: slot, new_epoch: false, diff --git a/processes/golden_tests/Cargo.toml b/processes/golden_tests/Cargo.toml index 704cf9ee..37ce23dc 100644 --- a/processes/golden_tests/Cargo.toml +++ b/processes/golden_tests/Cargo.toml @@ -27,7 +27,7 @@ acropolis_module_drep_state = { path = "../../modules/drep_state" } acropolis_module_governance_state = { path = "../../modules/governance_state" } acropolis_module_parameters_state = { path = "../../modules/parameters_state" } acropolis_module_stake_delta_filter = { path = "../../modules/stake_delta_filter" } -acropolis_module_epoch_activity_counter = { path = "../../modules/epoch_activity_counter" } +acropolis_module_epochs_state = { path = "../../modules/epochs_state" } acropolis_module_accounts_state = { path = "../../modules/accounts_state" } anyhow = "1.0" diff --git a/processes/golden_tests/src/test_module.rs b/processes/golden_tests/src/test_module.rs index 2d606be7..61377e6b 100644 --- a/processes/golden_tests/src/test_module.rs +++ b/processes/golden_tests/src/test_module.rs @@ -4,7 +4,7 @@ use acropolis_common::{ CardanoMessage, Message, RawTxsMessage, SnapshotDumpMessage, SnapshotMessage, SnapshotStateMessage, }, - BlockInfo, BlockStatus, Era, + BlockHash, BlockInfo, BlockStatus, Era, }; use anyhow::{Context as AnyhowContext, Result}; use caryatid_sdk::{module, Context, Module}; @@ -46,7 +46,7 @@ impl TestModule { status: BlockStatus::Volatile, slot: 1, number: 1, - hash: vec![], + hash: BlockHash::default(), epoch: 1, epoch_slot: 1, new_epoch: false, diff --git a/processes/omnibus/Cargo.toml b/processes/omnibus/Cargo.toml index 77ca90cd..3fb11193 100644 --- a/processes/omnibus/Cargo.toml +++ b/processes/omnibus/Cargo.toml @@ -29,7 +29,7 @@ acropolis_module_drep_state = { path = "../../modules/drep_state" } acropolis_module_governance_state = { path = "../../modules/governance_state" } acropolis_module_parameters_state = { path = "../../modules/parameters_state" } acropolis_module_stake_delta_filter = { path = "../../modules/stake_delta_filter" } -acropolis_module_epoch_activity_counter = { path = "../../modules/epoch_activity_counter" } +acropolis_module_epochs_state = { path = "../../modules/epochs_state" } acropolis_module_accounts_state = { path = "../../modules/accounts_state" } acropolis_module_rest_blockfrost = { path = "../../modules/rest_blockfrost" } acropolis_module_spdd_state = { path = "../../modules/spdd_state" } diff --git a/processes/omnibus/omnibus-sancho.toml b/processes/omnibus/omnibus-sancho.toml index 8ba5e6f1..14a60928 100644 --- a/processes/omnibus/omnibus-sancho.toml +++ b/processes/omnibus/omnibus-sancho.toml @@ -9,8 +9,6 @@ network-name = "sanchonet" # "sanchonet", "mainnet" #genesis-key = "5b3139312c36362c3134302c3138352c3133382c31312c3233372c3230372c3235302c3134342c32372c322c3138382c33302c31322c38312c3135352c3230342c31302c3137392c37352c32332c3133382c3139362c3231372c352c31342c32302c35372c37392c33392c3137365d" #download = false -[module.rest-blockfrost] - [module.upstream-chain-fetcher] sync-point = "cache" #"cache" # "origin", "tip", "snapshot" node-address = "sancho-testnet.able-pool.io:6002" @@ -18,8 +16,11 @@ magic-number = 4 [module.block-unpacker] +[module.rest-blockfrost] + [module.tx-unpacker] publish-utxo-deltas-topic = "cardano.utxo.deltas" +publish-asset-deltas-topic = "cardano.asset.deltas" publish-withdrawals-topic = "cardano.withdrawals" publish-certificates-topic = "cardano.certificates" publish-governance-topic = "cardano.governance" @@ -30,8 +31,38 @@ store = "memory" # "memory", "dashmap", "fjall", "fjall-async", "sled", "sled-as address-delta-topic = "cardano.address.delta" [module.spo-state] +# Enables /pools/{pool_id}/history endpoint +store-epochs-history = false +# Enable /pools/retired +store-retired-pools = false +# Enables /pools/{pool_id} endpoint +store-registration = false +# # Enables /pools/{pool_id}/updates endpoint +store-updates = false +# Enables /pools/{pool_id}/delegators endpoint (Requires store-stake-addresses to be enabled) +store-delegators = false +# Enables /pools/{pool_id}/votes endpoint +store-votes = false +# Store stake_addresses +store-stake-addresses = false + +[module.spdd-state] +store-spdd = false [module.drep-state] +# Enables /governance/dreps/{drep_id} endpoint (Requires store-delegators to be enabled) +store-info = false +# Enables /governance/dreps/{drep_id}/delegators endpoint +store-delegators = false +# Enables /governance/dreps/{drep_id}/metadata endpoint +store-metadata = false +# Enables /governance/dreps/{drep_id}/updates endpoint +store-updates = false +# Enables /governance/dreps/{drep_id}/votes endpoint +store-votes = false + +[module.drdd-state] +store-drdd = false [module.governance-state] @@ -43,11 +74,16 @@ network-name = "sanchonet" # "sanchonet", "mainnet" cache-mode = "write" # "predefined", "read", "write", "write-if-absent" write-full-cache = "false" -[module.epoch-activity-counter] +[module.epochs-state] +# Enables /epochs/{number} endpoint (for historical epochs) store-history = false +# Enables /pools/{pool_id}/blocks endpoint +store-block-hashes = false [module.accounts-state] +[module.assets-state] + [module.clock] [module.rest-server] diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index c41f1db5..f59b3cd1 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -32,12 +32,19 @@ store = "memory" # "memory", "dashmap", "fjall", "fjall-async", "sled", "sled-as address-delta-topic = "cardano.address.delta" [module.spo-state] +# Enables /pools/{pool_id}/history endpoint, enables to query active_stakes store-epochs-history = false +# Enable /pools/retired store-retired-pools = false +# Enables /pools/{pool_id} endpoint store-registration = false +# # Enables /pools/{pool_id}/updates endpoint store-updates = false +# Enables /pools/{pool_id}/delegators endpoint (Requires store-stake-addresses to be enabled) store-delegators = false +# Enables /pools/{pool_id}/votes endpoint store-votes = false +# Store stake_addresses store-stake-addresses = false [module.spdd-state] @@ -67,8 +74,11 @@ store-history = false cache-mode = "predefined" # "predefined", "read", "write", "write-if-absent" write-full-cache = "false" -[module.epoch-activity-counter] +[module.epochs-state] +# Enables /epochs/{number} endpoint (for historical epochs) store-history = false +# Enables /pools/{pool_id}/blocks endpoint +store-block-hashes = false [module.accounts-state] diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index 10809608..ecbebffe 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -14,7 +14,7 @@ use acropolis_module_assets_state::AssetsState; use acropolis_module_block_unpacker::BlockUnpacker; use acropolis_module_drdd_state::DRDDState; use acropolis_module_drep_state::DRepState; -use acropolis_module_epoch_activity_counter::EpochActivityCounter; +use acropolis_module_epochs_state::EpochsState; use acropolis_module_genesis_bootstrapper::GenesisBootstrapper; use acropolis_module_governance_state::GovernanceState; use acropolis_module_mithril_snapshot_fetcher::MithrilSnapshotFetcher; @@ -97,7 +97,7 @@ pub async fn main() -> Result<()> { GovernanceState::register(&mut process); ParametersState::register(&mut process); StakeDeltaFilter::register(&mut process); - EpochActivityCounter::register(&mut process); + EpochsState::register(&mut process); AccountsState::register(&mut process); AssetsState::register(&mut process); BlockfrostREST::register(&mut process); diff --git a/processes/replayer/Cargo.toml b/processes/replayer/Cargo.toml index 429e95b3..1061febc 100644 --- a/processes/replayer/Cargo.toml +++ b/processes/replayer/Cargo.toml @@ -29,7 +29,7 @@ acropolis_module_drep_state = { path = "../../modules/drep_state" } acropolis_module_governance_state = { path = "../../modules/governance_state" } acropolis_module_parameters_state = { path = "../../modules/parameters_state" } acropolis_module_stake_delta_filter = { path = "../../modules/stake_delta_filter" } -acropolis_module_epoch_activity_counter = { path = "../../modules/epoch_activity_counter" } +acropolis_module_epochs_state = { path = "../../modules/epochs_state" } acropolis_module_accounts_state = { path = "../../modules/accounts_state" } anyhow = "1.0" diff --git a/processes/replayer/replayer.toml b/processes/replayer/replayer.toml index 40080f15..806f5b17 100644 --- a/processes/replayer/replayer.toml +++ b/processes/replayer/replayer.toml @@ -46,7 +46,7 @@ subscribe-topic = "cardano.governance" cache-mode = "predefined" # "predefined", "read", "write", "write-if-absent" write-full-cache = "false" -[module.epoch-activity-counter] +[module.epochs-state] [module.accounts-state] diff --git a/processes/replayer/src/main.rs b/processes/replayer/src/main.rs index 9846508f..2f4e28ef 100644 --- a/processes/replayer/src/main.rs +++ b/processes/replayer/src/main.rs @@ -14,7 +14,7 @@ use tracing_subscriber::{filter, fmt, EnvFilter, Registry}; use acropolis_module_accounts_state::AccountsState; use acropolis_module_block_unpacker::BlockUnpacker; use acropolis_module_drep_state::DRepState; -use acropolis_module_epoch_activity_counter::EpochActivityCounter; +use acropolis_module_epochs_state::EpochsState; use acropolis_module_genesis_bootstrapper::GenesisBootstrapper; use acropolis_module_governance_state::GovernanceState; use acropolis_module_mithril_snapshot_fetcher::MithrilSnapshotFetcher; @@ -51,7 +51,7 @@ fn setup_governance_collect(process: &mut dyn ModuleRegistry) { GovernanceState::register(process); ParametersState::register(process); StakeDeltaFilter::register(process); - EpochActivityCounter::register(process); + EpochsState::register(process); AccountsState::register(process); Recorder::register(process); @@ -75,7 +75,7 @@ fn setup_alonzo_governance_collect(process: &mut dyn ModuleRegistry) { GovernanceState::register(process); ParametersState::register(process); StakeDeltaFilter::register(process); - EpochActivityCounter::register(process); + EpochsState::register(process); AccountsState::register(process); */ RecorderAlonzoGovernance::register(process); diff --git a/processes/replayer/src/playback.rs b/processes/replayer/src/playback.rs index 326d4a24..6ae07e9f 100644 --- a/processes/replayer/src/playback.rs +++ b/processes/replayer/src/playback.rs @@ -5,7 +5,7 @@ use acropolis_common::{ CardanoMessage, DRepStakeDistributionMessage, GovernanceProceduresMessage, Message, SPOStakeDistributionMessage, }, - BlockInfo, + BlockHash, BlockInfo, }; use anyhow::{anyhow, bail, ensure, Result}; use caryatid_sdk::{module, Context, Module}; @@ -121,7 +121,7 @@ impl PlaybackRunner { let mut curr_blk = prev_blk.clone(); curr_blk.slot += curr_block_num - prev_blk.number; curr_blk.number = curr_block_num; - curr_blk.hash = Vec::new(); + curr_blk.hash = BlockHash::default(); curr_blk.new_epoch = false; ensure!(curr_blk.slot < pending_blk.slot);