Skip to content

Commit 5af169f

Browse files
committed
feat: save stake_address on spo_state from certs, withdrawals, stake_delta messages
- Publish StakeRewardDeltas message from accounts-state and subscribe from spo_state to update stake_addresses - Introduced StakeRewardDeltasPublisher for publishing stake reward deltas.
1 parent 50e90f0 commit 5af169f

File tree

8 files changed

+468
-243
lines changed

8 files changed

+468
-243
lines changed

common/src/messages.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,11 @@ pub struct StakeAddressDeltasMessage {
102102
pub deltas: Vec<StakeAddressDelta>,
103103
}
104104

105-
/// Stake address Diffs message
106-
/// How stake address changes
105+
/// Stake reward deltas message
107106
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
108-
pub struct StakeAddressDiffsMessage {
109-
/// Stake Address Changes
110-
pub diffs: Vec<StakeAddressDiff>,
107+
pub struct StakeRewardDeltasMessage {
108+
/// Stake Reward Deltas
109+
pub deltas: Vec<StakeRewardDelta>,
111110
}
112111

113112
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
@@ -245,7 +244,7 @@ pub enum CardanoMessage {
245244
SPOStakeDistribution(SPOStakeDistributionMessage), // SPO delegation distribution (SPDD)
246245
SPORewards(SPORewardsMessage), // SPO rewards distribution (SPRD)
247246
StakeAddressDeltas(StakeAddressDeltasMessage), // Stake part of address deltas
248-
StakeAddressDiffs(StakeAddressDiffsMessage), // Stake Address Diffs
247+
StakeRewardDeltas(StakeRewardDeltasMessage), // Stake Reward Deltas
249248
}
250249

251250
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]

common/src/types.rs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -140,21 +140,11 @@ pub struct StakeAddressDelta {
140140
pub delta: i64,
141141
}
142142

143-
/// Stake Address State Change
144-
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
145-
pub struct StakeAddressDiff {
143+
/// Stake Address Reward change
144+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
145+
pub struct StakeRewardDelta {
146146
pub hash: KeyHash,
147-
pub change: StakeAddressChange,
148-
}
149-
150-
/// Stake Address State Change
151-
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
152-
pub enum StakeAddressChange {
153-
StakeAddressRegistered,
154-
StakeAddressDeregistered,
155-
DelegatedToSPO(KeyHash),
156-
DelegatedToDRep(DRepChoice),
157-
ValueDiff(LovelaceDelta),
147+
pub delta: i64,
158148
}
159149

160150
/// Value (lovelace + multiasset)

modules/accounts_state/src/accounts_state.rs

Lines changed: 33 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ use acropolis_common::{
66
queries::accounts::{PoolsLiveStakes, DEFAULT_ACCOUNTS_QUERY_TOPIC},
77
rest_helper::handle_rest,
88
state_history::{StateHistory, StateHistoryStore},
9-
BlockInfo, BlockStatus, StakeAddressDiff,
9+
BlockInfo, BlockStatus,
1010
};
1111
use anyhow::Result;
1212
use caryatid_sdk::{message_bus::Subscription, module, Context, Module};
1313
use config::Config;
1414
use std::sync::Arc;
15-
use tokio::sync::Mutex;
15+
use tokio::{join, sync::Mutex};
1616
use tracing::{error, info, info_span, Instrument};
1717

1818
mod drep_distribution_publisher;
@@ -21,9 +21,9 @@ mod spo_distribution_publisher;
2121
use spo_distribution_publisher::SPODistributionPublisher;
2222
mod spo_rewards_publisher;
2323
use spo_rewards_publisher::SPORewardsPublisher;
24-
mod stake_diffs_publisher;
24+
mod stake_reward_deltas_publisher;
2525
mod state;
26-
use stake_diffs_publisher::StakeDiffsPublisher;
26+
use stake_reward_deltas_publisher::StakeRewardDeltasPublisher;
2727
use state::State;
2828
mod monetary;
2929
mod rest;
@@ -45,7 +45,7 @@ const DEFAULT_DREP_DISTRIBUTION_TOPIC: &str = "cardano.drep.distribution";
4545
const DEFAULT_SPO_DISTRIBUTION_TOPIC: &str = "cardano.spo.distribution";
4646
const DEFAULT_SPO_REWARDS_TOPIC: &str = "cardano.spo.rewards";
4747
const DEFAULT_PROTOCOL_PARAMETERS_TOPIC: &str = "cardano.protocol.parameters";
48-
const DEFAULT_STAKE_DIFFS_TOPIC: &str = "cardano.stake.diffs";
48+
const DEFAULT_STAKE_REWARD_DELTAS_TOPIC: &str = "cardano.stake.reward.deltas";
4949

5050
const DEFAULT_HANDLE_POTS_TOPIC: (&str, &str) = ("handle-topic-pots", "rest.get.pots");
5151

@@ -64,7 +64,7 @@ impl AccountsState {
6464
mut drep_publisher: DRepDistributionPublisher,
6565
mut spo_publisher: SPODistributionPublisher,
6666
mut spo_rewards_publisher: SPORewardsPublisher,
67-
mut stake_diffs_publisher: StakeDiffsPublisher,
67+
mut stake_reward_deltas_publisher: StakeRewardDeltasPublisher,
6868
mut spos_subscription: Box<dyn Subscription<Message>>,
6969
mut ea_subscription: Box<dyn Subscription<Message>>,
7070
mut certs_subscription: Box<dyn Subscription<Message>>,
@@ -117,7 +117,6 @@ impl AccountsState {
117117
let stake_message_f = stake_subscription.read();
118118
let withdrawals_message_f = withdrawals_subscription.read();
119119
let mut current_block: Option<BlockInfo> = None;
120-
let mut stake_diffs = Vec::<StakeAddressDiff>::new();
121120

122121
// Use certs_message as the synchroniser, but we have to handle it after the
123122
// epoch things, because they apply to the new epoch, not the last
@@ -201,20 +200,28 @@ impl AccountsState {
201200
);
202201
async {
203202
Self::check_sync(&current_block, &block_info);
204-
let spo_rewards = state
203+
let after_epoch_result = state
205204
.handle_epoch_activity(ea_msg)
206205
.await
207206
.inspect_err(|e| error!("EpochActivity handling error: {e:#}"))
208207
.ok();
209-
// SPO rewards is for previous epoch
210-
if let Some((spo_rewards, diffs)) = spo_rewards {
211-
if let Err(e) = spo_rewards_publisher
212-
.publish_spo_rewards(block_info, spo_rewards)
213-
.await
214-
{
208+
if let Some((spo_rewards, stake_reward_deltas)) = after_epoch_result {
209+
// SPO Rewards Future
210+
let spo_rewards_future = spo_rewards_publisher
211+
.publish_spo_rewards(block_info, spo_rewards);
212+
// Stake Reward Deltas Future
213+
let stake_reward_deltas_future = stake_reward_deltas_publisher
214+
.publish_stake_reward_deltas(block_info, stake_reward_deltas);
215+
216+
// publish in parallel
217+
let (spo_rewards_result, stake_reward_deltas_result) =
218+
join!(spo_rewards_future, stake_reward_deltas_future);
219+
spo_rewards_result.unwrap_or_else(|e| {
215220
error!("Error publishing SPO rewards: {e:#}")
216-
}
217-
stake_diffs.extend(diffs);
221+
});
222+
stake_reward_deltas_result.unwrap_or_else(|e| {
223+
error!("Error publishing stake reward deltas: {e:#}")
224+
});
218225
}
219226
}
220227
.instrument(span)
@@ -264,13 +271,10 @@ impl AccountsState {
264271
let span = info_span!("account_state.handle_certs", block = block_info.number);
265272
async {
266273
Self::check_sync(&current_block, &block_info);
267-
let diffs = state
274+
state
268275
.handle_tx_certificates(tx_certs_msg)
269276
.inspect_err(|e| error!("TxCertificates handling error: {e:#}"))
270277
.ok();
271-
if let Some(diffs) = diffs {
272-
stake_diffs.extend(diffs);
273-
}
274278
}
275279
.instrument(span)
276280
.await;
@@ -289,13 +293,10 @@ impl AccountsState {
289293
);
290294
async {
291295
Self::check_sync(&current_block, &block_info);
292-
let diffs = state
296+
state
293297
.handle_withdrawals(withdrawals_msg)
294298
.inspect_err(|e| error!("Withdrawals handling error: {e:#}"))
295299
.ok();
296-
if let Some(diffs) = diffs {
297-
stake_diffs.extend(diffs);
298-
}
299300
}
300301
.instrument(span)
301302
.await;
@@ -314,13 +315,10 @@ impl AccountsState {
314315
);
315316
async {
316317
Self::check_sync(&current_block, &block_info);
317-
let diffs = state
318+
state
318319
.handle_stake_deltas(deltas_msg)
319320
.inspect_err(|e| error!("StakeAddressDeltas handling error: {e:#}"))
320321
.ok();
321-
if let Some(diffs) = diffs {
322-
stake_diffs.extend(diffs);
323-
}
324322
}
325323
.instrument(span)
326324
.await;
@@ -331,13 +329,6 @@ impl AccountsState {
331329

332330
// Commit the new state
333331
if let Some(block_info) = current_block {
334-
// publish stake address diffs message
335-
if let Err(e) =
336-
stake_diffs_publisher.publish_stake_diffs(&block_info, stake_diffs).await
337-
{
338-
error!("Error publishing stake diffs: {e:#}")
339-
}
340-
341332
history.lock().await.commit(block_info.number, state);
342333
}
343334
}
@@ -409,10 +400,10 @@ impl AccountsState {
409400
.get_string("publish-spo-rewards-topic")
410401
.unwrap_or(DEFAULT_SPO_REWARDS_TOPIC.to_string());
411402

412-
let stake_diffs_topic = config
413-
.get_string("publish-stake-diffs-topic")
414-
.unwrap_or(DEFAULT_STAKE_DIFFS_TOPIC.to_string());
415-
info!("Creating stake diffs subscriber on '{stake_diffs_topic}'");
403+
let stake_reward_deltas_topic = config
404+
.get_string("publish-stake-reward-deltas-topic")
405+
.unwrap_or(DEFAULT_STAKE_REWARD_DELTAS_TOPIC.to_string());
406+
info!("Creating stake reward deltas subscriber on '{stake_reward_deltas_topic}'");
416407

417408
// REST handler topics
418409
let handle_pots_topic = config
@@ -546,7 +537,8 @@ impl AccountsState {
546537
DRepDistributionPublisher::new(context.clone(), drep_distribution_topic);
547538
let spo_publisher = SPODistributionPublisher::new(context.clone(), spo_distribution_topic);
548539
let spo_rewards_publisher = SPORewardsPublisher::new(context.clone(), spo_rewards_topic);
549-
let stake_diffs_publisher = StakeDiffsPublisher::new(context.clone(), stake_diffs_topic);
540+
let stake_reward_deltas_publisher =
541+
StakeRewardDeltasPublisher::new(context.clone(), stake_reward_deltas_topic);
550542

551543
// Subscribe
552544
let spos_subscription = context.subscribe(&spo_state_topic).await?;
@@ -565,7 +557,7 @@ impl AccountsState {
565557
drep_publisher,
566558
spo_publisher,
567559
spo_rewards_publisher,
568-
stake_diffs_publisher,
560+
stake_reward_deltas_publisher,
569561
spos_subscription,
570562
ea_subscription,
571563
certs_subscription,

modules/accounts_state/src/stake_diffs_publisher.rs renamed to modules/accounts_state/src/stake_reward_deltas_publisher.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,37 @@
1-
use acropolis_common::messages::{CardanoMessage, Message, StakeAddressDiffsMessage};
2-
use acropolis_common::{BlockInfo, StakeAddressDiff};
1+
use acropolis_common::messages::{CardanoMessage, Message, StakeRewardDeltasMessage};
2+
use acropolis_common::{BlockInfo, StakeRewardDelta};
33
use caryatid_sdk::Context;
44
use std::sync::Arc;
55

6-
/// Message publisher for Stake Address Diffs
7-
pub struct StakeDiffsPublisher {
6+
/// Message publisher for Stake Reward Deltas
7+
pub struct StakeRewardDeltasPublisher {
88
/// Module context
99
context: Arc<Context<Message>>,
1010

1111
/// Topic to publish on
1212
topic: String,
1313
}
1414

15-
impl StakeDiffsPublisher {
15+
impl StakeRewardDeltasPublisher {
1616
/// Construct with context and topic to publish on
1717
pub fn new(context: Arc<Context<Message>>, topic: String) -> Self {
1818
Self { context, topic }
1919
}
2020

2121
/// Publish the Stake Diffs
22-
pub async fn publish_stake_diffs(
22+
pub async fn publish_stake_reward_deltas(
2323
&mut self,
2424
block: &BlockInfo,
25-
stake_diffs: Vec<StakeAddressDiff>,
25+
stake_reward_deltas: Vec<StakeRewardDelta>,
2626
) -> anyhow::Result<()> {
2727
self.context
2828
.message_bus
2929
.publish(
3030
&self.topic,
3131
Arc::new(Message::Cardano((
3232
block.clone(),
33-
CardanoMessage::StakeAddressDiffs(StakeAddressDiffsMessage {
34-
diffs: stake_diffs,
33+
CardanoMessage::StakeRewardDeltas(StakeRewardDeltasMessage {
34+
deltas: stake_reward_deltas,
3535
}),
3636
))),
3737
)

0 commit comments

Comments
 (0)