Skip to content

Commit d1aaa5b

Browse files
authored
Merge pull request #136 from input-output-hk/golddydev/pool-delegators
feat: introduce historical SPO state and aggregated state management
2 parents 78ef46f + ec6ba56 commit d1aaa5b

File tree

10 files changed

+898
-828
lines changed

10 files changed

+898
-828
lines changed

common/src/types.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,21 @@ pub struct PoolRetirement {
497497
pub epoch: u64,
498498
}
499499

500+
/// Pool Update Action
501+
#[derive(Debug, Clone, Serialize, Deserialize)]
502+
pub enum PoolUpdateAction {
503+
Registered,
504+
Deregistered,
505+
}
506+
507+
/// Pool Update Event
508+
#[derive(Debug, Clone, Serialize, Deserialize)]
509+
pub struct PoolUpdateEvent {
510+
pub tx_hash: TxHash,
511+
pub cert_index: u64,
512+
pub action: PoolUpdateAction,
513+
}
514+
500515
/// Pool Epoch History Data
501516
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
502517
pub struct PoolEpochState {
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
use std::{collections::BTreeMap, sync::Arc};
2+
3+
use acropolis_common::{
4+
messages::{EpochActivityMessage, SPOStakeDistributionMessage},
5+
state_history::{StateHistory, StateHistoryStore},
6+
BlockInfo, KeyHash,
7+
};
8+
use dashmap::DashMap;
9+
use imbl::HashMap;
10+
use rayon::prelude::*;
11+
use serde::Serialize;
12+
use tokio::sync::Mutex;
13+
use tracing::error;
14+
15+
// Aggregated SPO State by epoch N-1 (when current epoch is N)
16+
// Active Stakes and total blocks minted count
17+
#[derive(Clone)]
18+
pub struct AggregatedSPOState {
19+
/// Active stakes for each pool operator
20+
/// (epoch number, active stake)
21+
/// Remove elements when epoch number is less than current epoch number
22+
pub active_stakes: Arc<DashMap<KeyHash, BTreeMap<u64, u64>>>,
23+
24+
/// Volatile total blocks minted state, one per epoch
25+
/// Pop on first element when block number is smaller than `current block - SECURITY_PARAMETER_K`
26+
pub total_blocks_minted_history: Arc<Mutex<StateHistory<TotalBlocksMintedState>>>,
27+
}
28+
29+
#[derive(Default, Debug, Clone, Serialize)]
30+
pub struct TotalBlocksMintedState {
31+
/// block number of Epoch Boundary from N-1 to N
32+
block: u64,
33+
/// total blocks minted for each pool operator keyed by vrf_key_hash
34+
/// until the end of Epoch N-1
35+
total_blocks_minted: HashMap<KeyHash, u64>,
36+
}
37+
38+
impl AggregatedSPOState {
39+
pub fn new() -> Self {
40+
Self {
41+
active_stakes: Arc::new(DashMap::new()),
42+
total_blocks_minted_history: Arc::new(Mutex::new(StateHistory::new(
43+
"aggregated-spo-states/total-blocks-minted",
44+
StateHistoryStore::default_block_store(),
45+
))),
46+
}
47+
}
48+
49+
/// Get Pools Active Stakes by epoch and total active stake
50+
/// ## Arguments
51+
/// * `pools_operators` - A vector of pool operator hashes
52+
/// * `epoch` - The epoch to get the active stakes for
53+
/// ## Returns
54+
/// `(Vec<u64>, u64)` - a vector of active stakes for each pool operator and the total active stake.
55+
pub fn get_pools_active_stakes(
56+
&self,
57+
pools_operators: &Vec<KeyHash>,
58+
epoch: u64,
59+
) -> (Vec<u64>, u64) {
60+
let active_stakes = pools_operators
61+
.par_iter()
62+
.map(|spo| self.get_active_stake(spo, epoch).unwrap_or(0))
63+
.collect::<Vec<u64>>();
64+
let total_active_stake = self.get_total_active_stake(epoch);
65+
(active_stakes, total_active_stake)
66+
}
67+
68+
fn get_active_stake(&self, spo: &KeyHash, epoch: u64) -> Option<u64> {
69+
self.active_stakes.get(spo).map(|stakes| stakes.get(&epoch).cloned()).flatten()
70+
}
71+
72+
fn get_total_active_stake(&self, epoch: u64) -> u64 {
73+
self.active_stakes.iter().map(|entry| entry.value().get(&epoch).cloned().unwrap_or(0)).sum()
74+
}
75+
76+
/// Get total blocks minted for each vrf vkey hash
77+
/// ## Arguments
78+
/// * `vrf_key_hashes` - A vector of vrf key hashes
79+
/// ## Returns
80+
/// `Vec<u64>` - a vector of total blocks minted for each vrf key hash.
81+
pub async fn get_total_blocks_minted(&self, vrf_key_hashes: &Vec<KeyHash>) -> Vec<u64> {
82+
let locked_history = self.total_blocks_minted_history.lock().await;
83+
let Some(current) = locked_history.current() else {
84+
return vec![0; vrf_key_hashes.len()];
85+
};
86+
let total_blocks_minted = vrf_key_hashes
87+
.iter()
88+
.map(|vrf_vkey_hash| {
89+
current.total_blocks_minted.get(vrf_vkey_hash).cloned().unwrap_or(0)
90+
})
91+
.collect();
92+
total_blocks_minted
93+
}
94+
95+
/// Handle SPO Stake Distribution
96+
/// Live stake snapshots taken at Epoch N - 1 to N boundary (Mark at Epoch N)
97+
/// Active stake is valid from Epoch N + 1 (Set at Epoch N + 1)
98+
///
99+
pub fn handle_spdd(&self, block: &BlockInfo, spdd_message: &SPOStakeDistributionMessage) {
100+
let SPOStakeDistributionMessage { epoch, spos } = spdd_message;
101+
if *epoch != block.epoch - 1 {
102+
error!(
103+
"SPO Stake Distribution Message's epoch {} is wrong against current block's epoch {}",
104+
*epoch, block.epoch
105+
)
106+
}
107+
let epoch_to_update = *epoch + 2;
108+
109+
// update active stakes
110+
spos.par_iter().for_each(|(spo, value)| {
111+
let mut active_stakes = self
112+
.active_stakes
113+
.entry(spo.clone())
114+
.and_modify(|stakes| stakes.retain(|k, _| *k >= block.epoch))
115+
.or_insert_with(BTreeMap::new);
116+
117+
active_stakes.insert(epoch_to_update, value.active);
118+
});
119+
}
120+
121+
/// Handle Epoch Activity
122+
/// Returns blocks minted amount keyed by spo
123+
///
124+
pub async fn handle_epoch_activity(
125+
&self,
126+
block: &BlockInfo,
127+
epoch_activity_message: &EpochActivityMessage,
128+
) {
129+
let EpochActivityMessage {
130+
epoch,
131+
vrf_vkey_hashes,
132+
..
133+
} = epoch_activity_message;
134+
if *epoch != block.epoch - 1 {
135+
error!(
136+
"Epoch Activity Message's epoch {} is wrong against current block's epoch {}",
137+
*epoch, block.epoch
138+
)
139+
}
140+
141+
let mut locked_history = self.total_blocks_minted_history.lock().await;
142+
let mut total_blocks_minted =
143+
locked_history.get_rolled_back_state(block.number).total_blocks_minted;
144+
145+
// handle blocks_minted state
146+
vrf_vkey_hashes.iter().for_each(|(vrf_vkey_hash, amount)| {
147+
total_blocks_minted
148+
.entry(vrf_vkey_hash.clone())
149+
.and_modify(|v| *v += *amount as u64)
150+
.or_insert(*amount as u64);
151+
});
152+
153+
let new_state = TotalBlocksMintedState {
154+
block: block.number,
155+
total_blocks_minted,
156+
};
157+
158+
locked_history.commit(block.number, new_state);
159+
}
160+
}
161+
162+
#[cfg(test)]
163+
mod tests {
164+
use acropolis_common::DelegatedStake;
165+
166+
use super::*;
167+
use crate::test_utils::*;
168+
169+
#[tokio::test]
170+
async fn new_state_returns_zeros() {
171+
let aggregated_state = AggregatedSPOState::new();
172+
assert!(aggregated_state.active_stakes.is_empty());
173+
let total_blocks_minted =
174+
aggregated_state.get_total_blocks_minted(&vec![vec![11], vec![12]]).await;
175+
assert_eq!(2, total_blocks_minted.len());
176+
assert_eq!(0, total_blocks_minted[0]);
177+
assert_eq!(0, total_blocks_minted[1]);
178+
}
179+
180+
#[test]
181+
fn active_stakes_not_empty_after_handle_spdd() {
182+
let aggregated_state = AggregatedSPOState::new();
183+
let block = new_block(2);
184+
let mut msg = new_spdd_message(1);
185+
msg.spos = vec![
186+
(
187+
vec![1],
188+
DelegatedStake {
189+
active: 1,
190+
active_delegators_count: 1,
191+
live: 1,
192+
},
193+
),
194+
(
195+
vec![2],
196+
DelegatedStake {
197+
active: 2,
198+
active_delegators_count: 2,
199+
live: 2,
200+
},
201+
),
202+
];
203+
aggregated_state.handle_spdd(&block, &msg);
204+
let (active_stakes, total_active_stake) =
205+
aggregated_state.get_pools_active_stakes(&vec![vec![1], vec![2]], 3);
206+
assert_eq!(2, active_stakes.len());
207+
assert_eq!(1, active_stakes[0]);
208+
assert_eq!(2, active_stakes[1]);
209+
assert_eq!(3, total_active_stake);
210+
}
211+
212+
#[tokio::test]
213+
async fn total_blocks_minted_not_empty_after_handle_epoch_activity() {
214+
let aggregated_state = AggregatedSPOState::new();
215+
let block = new_block(2);
216+
let mut msg = new_epoch_activity_message(1);
217+
msg.vrf_vkey_hashes = vec![(vec![11], 1), (vec![12], 2)];
218+
aggregated_state.handle_epoch_activity(&block, &msg).await;
219+
let total_blocks_minted =
220+
aggregated_state.get_total_blocks_minted(&vec![vec![11], vec![12]]).await;
221+
assert_eq!(2, total_blocks_minted.len());
222+
assert_eq!(1, total_blocks_minted[0]);
223+
assert_eq!(2, total_blocks_minted[1]);
224+
}
225+
226+
#[tokio::test]
227+
async fn total_blocks_minted_history_pruned_after_rollback() {
228+
let aggregated_state = AggregatedSPOState::new();
229+
let mut block = new_block(2);
230+
let mut msg = new_epoch_activity_message(1);
231+
msg.vrf_vkey_hashes = vec![(vec![11], 1), (vec![12], 2)];
232+
aggregated_state.handle_epoch_activity(&block, &msg).await;
233+
assert_eq!(
234+
1,
235+
aggregated_state.total_blocks_minted_history.lock().await.len()
236+
);
237+
238+
block = new_block(3);
239+
msg = new_epoch_activity_message(2);
240+
msg.vrf_vkey_hashes = vec![(vec![11], 3), (vec![12], 4)];
241+
aggregated_state.handle_epoch_activity(&block, &msg).await;
242+
assert_eq!(
243+
2,
244+
aggregated_state.total_blocks_minted_history.lock().await.len()
245+
);
246+
247+
block = new_block(4);
248+
msg = new_epoch_activity_message(3);
249+
msg.vrf_vkey_hashes = vec![(vec![11], 5), (vec![12], 6)];
250+
aggregated_state.handle_epoch_activity(&block, &msg).await;
251+
assert_eq!(
252+
3,
253+
aggregated_state.total_blocks_minted_history.lock().await.len()
254+
);
255+
256+
block = new_block(2);
257+
msg = new_epoch_activity_message(1);
258+
msg.vrf_vkey_hashes = vec![(vec![11], 7), (vec![12], 8)];
259+
aggregated_state.handle_epoch_activity(&block, &msg).await;
260+
assert_eq!(
261+
1,
262+
aggregated_state.total_blocks_minted_history.lock().await.len()
263+
);
264+
}
265+
}

modules/spo_state/src/epochs_history.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub struct EpochsHistoryState {
6868
impl EpochsHistoryState {
6969
pub fn new(store_config: StoreConfig) -> Self {
7070
Self {
71-
epochs_history: if store_config.store_history {
71+
epochs_history: if store_config.store_epochs_history {
7272
Some(Arc::new(DashMap::new()))
7373
} else {
7474
None
@@ -149,7 +149,7 @@ impl EpochsHistoryState {
149149
&self,
150150
_block: &BlockInfo,
151151
epoch_activity_message: &EpochActivityMessage,
152-
spos: &Vec<(KeyHash, u64)>,
152+
spos: &Vec<(KeyHash, usize)>,
153153
) {
154154
let Some(epochs_history) = self.epochs_history.as_ref() else {
155155
return;
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use acropolis_common::{
2+
queries::governance::VoteRecord, PoolRegistration, PoolUpdateEvent, StakeCredential,
3+
};
4+
use serde::{Deserialize, Serialize};
5+
6+
use crate::store_config::StoreConfig;
7+
8+
// Historical SPO State
9+
// each field can be optional (according to configurations)
10+
#[derive(Debug, Clone, Serialize, Deserialize)]
11+
pub struct HistoricalSPOState {
12+
pub registration: Option<PoolRegistration>,
13+
pub updates: Option<Vec<PoolUpdateEvent>>,
14+
15+
// SPO's delegator's stake credential
16+
pub delegators: Option<Vec<StakeCredential>>,
17+
18+
// SPO's votes
19+
pub votes: Option<Vec<VoteRecord>>,
20+
}
21+
22+
impl HistoricalSPOState {
23+
#[allow(dead_code)]
24+
pub fn new(store_config: StoreConfig) -> Self {
25+
Self {
26+
registration: store_config.store_registration.then(PoolRegistration::default),
27+
updates: store_config.store_updates.then(Vec::new),
28+
delegators: store_config.store_delegators.then(Vec::new),
29+
votes: store_config.store_votes.then(Vec::new),
30+
}
31+
}
32+
}

modules/spo_state/src/retired_pools_history.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::store_config::StoreConfig;
88

99
#[derive(Debug, Clone)]
1010
pub struct RetiredPoolsHistoryState {
11+
/// keyed by epoch
1112
retired_pools_history: Option<Arc<DashMap<u64, Vec<KeyHash>>>>,
1213
}
1314

0 commit comments

Comments
 (0)