Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 252 additions & 58 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion common/src/queries/accounts.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;

use crate::{DRepChoice, KeyHash};
use crate::{DRepChoice, KeyHash, PoolLiveStakeInfo};

pub const DEFAULT_ACCOUNTS_QUERY_TOPIC: (&str, &str) =
("accounts-state-query-topic", "cardano.query.accounts");
Expand All @@ -27,6 +27,7 @@ pub enum AccountsStateQuery {
GetOptimalPoolSizing,
GetPoolsLiveStakes { pools_operators: Vec<Vec<u8>> },
GetPoolDelegators { pool_operator: KeyHash },
GetPoolLiveStake { pool_operator: KeyHash },

// Dreps related queries
GetDrepDelegators { drep: DRepChoice },
Expand Down Expand Up @@ -55,6 +56,7 @@ pub enum AccountsStateQueryResponse {
OptimalPoolSizing(Option<OptimalPoolSizing>),
PoolsLiveStakes(Vec<u64>),
PoolDelegators(PoolDelegators),
PoolLiveStake(PoolLiveStakeInfo),

// DReps related responses
DrepDelegators(DrepDelegators),
Expand Down
39 changes: 3 additions & 36 deletions common/src/queries/epochs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{messages::EpochActivityMessage, protocol_params::ProtocolParams, BlockHash, KeyHash};
use crate::{messages::EpochActivityMessage, protocol_params::ProtocolParams, KeyHash};

pub const DEFAULT_EPOCHS_QUERY_TOPIC: (&str, &str) =
("epochs-state-query-topic", "cardano.query.epochs");
Expand All @@ -13,12 +13,7 @@ pub enum EpochsStateQuery {
GetEpochStakeDistributionByPool { epoch_number: u64 },
GetEpochBlockDistribution { epoch_number: u64 },
GetEpochBlockDistributionByPool { epoch_number: u64 },

// Pools related queries
GetBlocksMintedByPools { vrf_key_hashes: Vec<KeyHash> },
GetTotalBlocksMintedByPools { vrf_key_hashes: Vec<KeyHash> },
GetBlocksMintedInfoByPool { vrf_key_hash: KeyHash },
GetBlockHashesByPool { vrf_key_hash: KeyHash },
GetLatestEpochBlocksMintedByPool { vrf_key_hash: KeyHash },
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
Expand All @@ -31,12 +26,7 @@ pub enum EpochsStateQueryResponse {
EpochStakeDistributionByPool(EpochStakeDistributionByPool),
EpochBlockDistribution(EpochBlockDistribution),
EpochBlockDistributionByPool(EpochBlockDistributionByPool),

// Pools related responses
BlocksMintedByPools(BlocksMintedByPools),
TotalBlocksMintedByPools(TotalBlocksMintedByPools),
BlocksMintedInfoByPool(BlocksMintedInfoByPool),
BlockHashesByPool(BlockHashesByPool),
LatestEpochBlocksMintedByPool(u64),

NotFound,
Error(String),
Expand Down Expand Up @@ -74,26 +64,3 @@ pub struct EpochBlockDistribution {}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct EpochBlockDistributionByPool {}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct BlocksMintedByPools {
// this is in same order of vrf_key_hashes from EpochsStateQuery::BlocksMintedByPools
pub blocks_minted: Vec<u64>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct TotalBlocksMintedByPools {
// this is in same order of vrf_key_hashes from EpochsStateQuery::TotalBlocksMinted
pub total_blocks_minted: Vec<u64>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct BlocksMintedInfoByPool {
pub total_blocks_minted: u64,
pub epoch_blocks_minted: u64,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct BlockHashesByPool {
pub hashes: Vec<BlockHash>,
}
35 changes: 22 additions & 13 deletions common/src/queries/pools.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
queries::governance::VoteRecord, rational_number::RationalNumber, KeyHash, PoolEpochState,
PoolMetadata, PoolRegistration, PoolRetirement, PoolUpdateEvent, Relay,
queries::governance::VoteRecord, rational_number::RationalNumber, BlockHash, KeyHash,
PoolEpochState, PoolMetadata, PoolRegistration, PoolRetirement, PoolUpdateEvent, Relay,
};

pub const DEFAULT_POOLS_QUERY_TOPIC: (&str, &str) =
Expand All @@ -20,26 +20,35 @@ pub enum PoolsStateQuery {
pools_operators: Vec<KeyHash>,
epoch: u64,
},
GetPoolsTotalBlocksMinted {
pools_operators: Vec<KeyHash>,
},
GetPoolInfo {
pool_id: Vec<u8>,
pool_id: KeyHash,
},
GetPoolHistory {
pool_id: Vec<u8>,
pool_id: KeyHash,
},
GetPoolMetadata {
pool_id: Vec<u8>,
pool_id: KeyHash,
},
GetPoolRelays {
pool_id: Vec<u8>,
pool_id: KeyHash,
},
GetPoolDelegators {
pool_id: Vec<u8>,
pool_id: KeyHash,
},
GetPoolTotalBlocksMinted {
pool_id: KeyHash,
},
GetPoolBlockHashes {
pool_id: KeyHash,
},
GetPoolUpdates {
pool_id: Vec<u8>,
pool_id: KeyHash,
},
GetPoolVotes {
pool_id: Vec<u8>,
pool_id: KeyHash,
},
}

Expand All @@ -51,11 +60,14 @@ pub enum PoolsStateQueryResponse {
PoolsRetiringList(Vec<PoolRetirement>),
PoolActiveStakeInfo(PoolActiveStakeInfo),
PoolsActiveStakes(Vec<u64>),
PoolInfo(PoolInfo),
PoolsTotalBlocksMinted(Vec<u64>),
PoolInfo(PoolRegistration),
PoolHistory(Vec<PoolEpochState>),
PoolMetadata(PoolMetadata),
PoolRelays(Vec<Relay>),
PoolDelegators(PoolDelegators),
PoolTotalBlocksMinted(u64),
PoolBlockHashes(Vec<BlockHash>),
PoolUpdates(Vec<PoolUpdateEvent>),
PoolVotes(Vec<VoteRecord>),
NotFound,
Expand All @@ -73,9 +85,6 @@ pub struct PoolActiveStakeInfo {
pub active_size: RationalNumber,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PoolInfo {}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PoolDelegators {
pub delegators: Vec<(KeyHash, u64)>,
Expand Down
32 changes: 30 additions & 2 deletions common/src/stake_addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use std::{

use crate::{
math::update_value_with_delta, messages::DRepDelegationDistribution, DRepChoice,
DRepCredential, DelegatedStake, KeyHash, Lovelace, StakeAddressDelta, StakeCredential,
Withdrawal,
DRepCredential, DelegatedStake, KeyHash, Lovelace, PoolLiveStakeInfo, StakeAddressDelta,
StakeCredential, Withdrawal,
};
use anyhow::Result;
use dashmap::DashMap;
Expand Down Expand Up @@ -110,6 +110,34 @@ impl StakeAddressMap {
self.get(stake_key).map(|sas| sas.registered).unwrap_or(false)
}

/// Get Pool's Live Stake Info
pub fn get_pool_live_stake_info(&self, spo: &KeyHash) -> PoolLiveStakeInfo {
let total_live_stakes = AtomicU64::new(0);
let live_stake = AtomicU64::new(0);
let live_delegators = AtomicU64::new(0);

// Par Iter stake addresses values
self.inner.par_iter().for_each(|(_, sas)| {
total_live_stakes.fetch_add(sas.utxo_value, std::sync::atomic::Ordering::Relaxed);
if sas.delegated_spo.as_ref().map(|d_spo| d_spo.eq(spo)).unwrap_or(false) {
live_stake.fetch_add(
sas.utxo_value + sas.rewards,
std::sync::atomic::Ordering::Relaxed,
);
live_delegators.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
});

let total_live_stakes = total_live_stakes.load(std::sync::atomic::Ordering::Relaxed);
let live_stake = live_stake.load(std::sync::atomic::Ordering::Relaxed);
let live_delegators = live_delegators.load(std::sync::atomic::Ordering::Relaxed);
PoolLiveStakeInfo {
live_stake,
live_delegators,
total_live_stakes,
}
}

/// Get Pool's Live Stake (same order as spos)
pub fn get_pools_live_stakes(&self, spos: &Vec<KeyHash>) -> Vec<u64> {
let mut live_stakes_map = HashMap::<KeyHash, u64>::new();
Expand Down
10 changes: 9 additions & 1 deletion common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ pub struct PoolRetirement {
}

/// Pool Update Action
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum PoolUpdateAction {
Registered,
Deregistered,
Expand Down Expand Up @@ -670,6 +670,14 @@ impl PoolUpdateEvent {
}
}

/// Pool Live Stake Info
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PoolLiveStakeInfo {
pub live_stake: u64,
pub live_delegators: u64,
pub total_live_stakes: u64,
}

/// Pool Epoch History Data
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PoolEpochState {
Expand Down
6 changes: 6 additions & 0 deletions modules/accounts_state/src/accounts_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,12 @@ impl AccountsState {
})
}

AccountsStateQuery::GetPoolLiveStake { pool_operator } => {
AccountsStateQueryResponse::PoolLiveStake(
state.get_pool_live_stake_info(pool_operator),
)
}

AccountsStateQuery::GetDrepDelegators { drep } => {
AccountsStateQueryResponse::DrepDelegators(DrepDelegators {
delegators: state.get_drep_delegators(drep),
Expand Down
6 changes: 6 additions & 0 deletions modules/accounts_state/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::monetary::calculate_monetary_change;
use crate::rewards::{RewardsResult, RewardsState};
use crate::snapshot::Snapshot;
use acropolis_common::queries::accounts::OptimalPoolSizing;
use acropolis_common::PoolLiveStakeInfo;
use acropolis_common::{
math::update_value_with_delta,
messages::{
Expand Down Expand Up @@ -106,6 +107,11 @@ impl State {
Some(OptimalPoolSizing { total_supply, nopt })
}

/// Get Pool Live Stake Info
pub fn get_pool_live_stake_info(&self, pool_operator: &KeyHash) -> PoolLiveStakeInfo {
self.stake_addresses.lock().unwrap().get_pool_live_stake_info(pool_operator)
}

/// Get Pools Live stake
pub fn get_pools_live_stakes(&self, pool_operators: &Vec<KeyHash>) -> Vec<u64> {
self.stake_addresses.lock().unwrap().get_pools_live_stakes(pool_operators)
Expand Down
6 changes: 3 additions & 3 deletions modules/epochs_state/src/epochs_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,19 @@ mod tests {

#[test]
fn epochs_history_is_none_when_store_history_is_false() {
let epochs_history = EpochsHistoryState::new(&StoreConfig::new(false, false));
let epochs_history = EpochsHistoryState::new(&StoreConfig::new(false));
assert!(epochs_history.epochs_history.is_none());
}

#[test]
fn epochs_history_is_some_when_store_history_is_true() {
let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true, false));
let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true));
assert!(epochs_history.epochs_history.is_some());
}

#[test]
fn handle_epoch_activity_saves_history() {
let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true, false));
let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true));
let block = make_block(200);
epochs_history.handle_epoch_activity(
&block,
Expand Down
48 changes: 6 additions & 42 deletions modules/epochs_state/src/epochs_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
use acropolis_common::{
messages::{CardanoMessage, Message, StateQuery, StateQueryResponse},
queries::epochs::{
BlockHashesByPool, BlocksMintedByPools, BlocksMintedInfoByPool, EpochInfo,
EpochsStateQuery, EpochsStateQueryResponse, LatestEpoch, TotalBlocksMintedByPools,
EpochInfo, EpochsStateQuery, EpochsStateQueryResponse, LatestEpoch,
DEFAULT_EPOCHS_QUERY_TOPIC,
},
state_history::{StateHistory, StateHistoryStore},
Expand Down Expand Up @@ -50,14 +49,13 @@ impl EpochsState {
async fn run(
history: Arc<Mutex<StateHistory<State>>>,
epochs_history: EpochsHistoryState,
store_config: &StoreConfig,
mut headers_subscription: Box<dyn Subscription<Message>>,
mut fees_subscription: Box<dyn Subscription<Message>>,
mut epoch_activity_publisher: EpochActivityPublisher,
) -> Result<()> {
loop {
// Get a mutable state
let mut state = history.lock().await.get_or_init_with(|| State::new(store_config));
let mut state = history.lock().await.get_or_init_with(|| State::new());
let mut current_block: Option<BlockInfo> = None;

// Read both topics in parallel
Expand Down Expand Up @@ -113,9 +111,7 @@ impl EpochsState {
// are suppressed upstream
match MultiEraHeader::decode(variant, None, &header_msg.raw) {
Ok(header) => {
if let Some(vrf_vkey) = header.vrf_vkey() {
state.handle_mint(&block_info, Some(vrf_vkey));
}
state.handle_mint(&block_info, header.vrf_vkey());
}

Err(e) => error!("Can't decode header {}: {e}", block_info.slot),
Expand Down Expand Up @@ -231,43 +227,12 @@ impl EpochsState {
}
}

EpochsStateQuery::GetBlocksMintedByPools { vrf_key_hashes } => {
EpochsStateQueryResponse::BlocksMintedByPools(BlocksMintedByPools {
blocks_minted: state.get_blocks_minted_by_pools(vrf_key_hashes),
})
}

EpochsStateQuery::GetTotalBlocksMintedByPools { vrf_key_hashes } => {
EpochsStateQueryResponse::TotalBlocksMintedByPools(
TotalBlocksMintedByPools {
total_blocks_minted: state
.get_total_blocks_minted_by_pools(vrf_key_hashes),
},
EpochsStateQuery::GetLatestEpochBlocksMintedByPool { vrf_key_hash } => {
EpochsStateQueryResponse::LatestEpochBlocksMintedByPool(
state.get_latest_epoch_blocks_minted_by_pool(vrf_key_hash),
)
}

EpochsStateQuery::GetBlocksMintedInfoByPool { vrf_key_hash } => {
let (total_blocks_minted, epoch_blocks_minted) =
state.get_blocks_minted_data_by_pool(vrf_key_hash);
EpochsStateQueryResponse::BlocksMintedInfoByPool(BlocksMintedInfoByPool {
total_blocks_minted,
epoch_blocks_minted,
})
}

EpochsStateQuery::GetBlockHashesByPool { vrf_key_hash } => {
if state.is_block_hashes_enabled() {
let hashes = state.get_block_hashes(vrf_key_hash);
EpochsStateQueryResponse::BlockHashesByPool(BlockHashesByPool {
hashes,
})
} else {
EpochsStateQueryResponse::Error(
"Block hashes are not enabled".to_string(),
)
}
}

_ => EpochsStateQueryResponse::Error(format!(
"Unimplemented query variant: {:?}",
query
Expand All @@ -284,7 +249,6 @@ impl EpochsState {
Self::run(
history,
epochs_history,
&store_config,
headers_subscription,
fees_subscription,
epoch_activity_publisher,
Expand Down
Loading