@@ -6,7 +6,7 @@ use acropolis_common::{
6
6
queries:: accounts:: { PoolsLiveStakes , DEFAULT_ACCOUNTS_QUERY_TOPIC } ,
7
7
rest_helper:: handle_rest,
8
8
state_history:: { StateHistory , StateHistoryStore } ,
9
- BlockInfo , BlockStatus ,
9
+ BlockInfo , BlockStatus , StakeAddressDiff ,
10
10
} ;
11
11
use anyhow:: Result ;
12
12
use caryatid_sdk:: { message_bus:: Subscription , module, Context , Module } ;
@@ -21,7 +21,9 @@ mod spo_distribution_publisher;
21
21
use spo_distribution_publisher:: SPODistributionPublisher ;
22
22
mod spo_rewards_publisher;
23
23
use spo_rewards_publisher:: SPORewardsPublisher ;
24
+ mod stake_diffs_publisher;
24
25
mod state;
26
+ use stake_diffs_publisher:: StakeDiffsPublisher ;
25
27
use state:: State ;
26
28
mod monetary;
27
29
mod rest;
@@ -43,6 +45,7 @@ const DEFAULT_DREP_DISTRIBUTION_TOPIC: &str = "cardano.drep.distribution";
43
45
const DEFAULT_SPO_DISTRIBUTION_TOPIC : & str = "cardano.spo.distribution" ;
44
46
const DEFAULT_SPO_REWARDS_TOPIC : & str = "cardano.spo.rewards" ;
45
47
const DEFAULT_PROTOCOL_PARAMETERS_TOPIC : & str = "cardano.protocol.parameters" ;
48
+ const DEFAULT_STAKE_DIFFS_TOPIC : & str = "cardano.stake.diffs" ;
46
49
47
50
const DEFAULT_HANDLE_POTS_TOPIC : ( & str , & str ) = ( "handle-topic-pots" , "rest.get.pots" ) ;
48
51
@@ -61,6 +64,7 @@ impl AccountsState {
61
64
mut drep_publisher : DRepDistributionPublisher ,
62
65
mut spo_publisher : SPODistributionPublisher ,
63
66
mut spo_rewards_publisher : SPORewardsPublisher ,
67
+ mut stake_diffs_publisher : StakeDiffsPublisher ,
64
68
mut spos_subscription : Box < dyn Subscription < Message > > ,
65
69
mut ea_subscription : Box < dyn Subscription < Message > > ,
66
70
mut certs_subscription : Box < dyn Subscription < Message > > ,
@@ -113,6 +117,7 @@ impl AccountsState {
113
117
let stake_message_f = stake_subscription. read ( ) ;
114
118
let withdrawals_message_f = withdrawals_subscription. read ( ) ;
115
119
let mut current_block: Option < BlockInfo > = None ;
120
+ let mut stake_diffs = Vec :: < StakeAddressDiff > :: new ( ) ;
116
121
117
122
// Use certs_message as the synchroniser, but we have to handle it after the
118
123
// epoch things, because they apply to the new epoch, not the last
@@ -202,13 +207,14 @@ impl AccountsState {
202
207
. inspect_err ( |e| error ! ( "EpochActivity handling error: {e:#}" ) )
203
208
. ok ( ) ;
204
209
// SPO rewards is for previous epoch
205
- if let Some ( spo_rewards) = spo_rewards {
210
+ if let Some ( ( spo_rewards, diffs ) ) = spo_rewards {
206
211
if let Err ( e) = spo_rewards_publisher
207
212
. publish_spo_rewards ( block_info, spo_rewards)
208
213
. await
209
214
{
210
215
error ! ( "Error publishing SPO rewards: {e:#}" )
211
216
}
217
+ stake_diffs. extend ( diffs) ;
212
218
}
213
219
}
214
220
. instrument ( span)
@@ -258,10 +264,13 @@ impl AccountsState {
258
264
let span = info_span ! ( "account_state.handle_certs" , block = block_info. number) ;
259
265
async {
260
266
Self :: check_sync ( & current_block, & block_info) ;
261
- state
267
+ let diffs = state
262
268
. handle_tx_certificates ( tx_certs_msg)
263
269
. inspect_err ( |e| error ! ( "TxCertificates handling error: {e:#}" ) )
264
270
. ok ( ) ;
271
+ if let Some ( diffs) = diffs {
272
+ stake_diffs. extend ( diffs) ;
273
+ }
265
274
}
266
275
. instrument ( span)
267
276
. await ;
@@ -280,10 +289,13 @@ impl AccountsState {
280
289
) ;
281
290
async {
282
291
Self :: check_sync ( & current_block, & block_info) ;
283
- state
292
+ let diffs = state
284
293
. handle_withdrawals ( withdrawals_msg)
285
294
. inspect_err ( |e| error ! ( "Withdrawals handling error: {e:#}" ) )
286
295
. ok ( ) ;
296
+ if let Some ( diffs) = diffs {
297
+ stake_diffs. extend ( diffs) ;
298
+ }
287
299
}
288
300
. instrument ( span)
289
301
. await ;
@@ -302,10 +314,13 @@ impl AccountsState {
302
314
) ;
303
315
async {
304
316
Self :: check_sync ( & current_block, & block_info) ;
305
- state
317
+ let diffs = state
306
318
. handle_stake_deltas ( deltas_msg)
307
319
. inspect_err ( |e| error ! ( "StakeAddressDeltas handling error: {e:#}" ) )
308
320
. ok ( ) ;
321
+ if let Some ( diffs) = diffs {
322
+ stake_diffs. extend ( diffs) ;
323
+ }
309
324
}
310
325
. instrument ( span)
311
326
. await ;
@@ -316,6 +331,13 @@ impl AccountsState {
316
331
317
332
// Commit the new state
318
333
if let Some ( block_info) = current_block {
334
+ // publish stake address diffs message
335
+ if let Err ( e) =
336
+ stake_diffs_publisher. publish_stake_diffs ( & block_info, stake_diffs) . await
337
+ {
338
+ error ! ( "Error publishing stake diffs: {e:#}" )
339
+ }
340
+
319
341
history. lock ( ) . await . commit ( block_info. number , state) ;
320
342
}
321
343
}
@@ -387,6 +409,11 @@ impl AccountsState {
387
409
. get_string ( "publish-spo-rewards-topic" )
388
410
. unwrap_or ( DEFAULT_SPO_REWARDS_TOPIC . to_string ( ) ) ;
389
411
412
+ let stake_diffs_topic = config
413
+ . get_string ( "publish-stake-diffs-topic" )
414
+ . unwrap_or ( DEFAULT_STAKE_DIFFS_TOPIC . to_string ( ) ) ;
415
+ info ! ( "Creating stake diffs subscriber on '{stake_diffs_topic}'" ) ;
416
+
390
417
// REST handler topics
391
418
let handle_pots_topic = config
392
419
. get_string ( DEFAULT_HANDLE_POTS_TOPIC . 0 )
@@ -519,6 +546,7 @@ impl AccountsState {
519
546
DRepDistributionPublisher :: new ( context. clone ( ) , drep_distribution_topic) ;
520
547
let spo_publisher = SPODistributionPublisher :: new ( context. clone ( ) , spo_distribution_topic) ;
521
548
let spo_rewards_publisher = SPORewardsPublisher :: new ( context. clone ( ) , spo_rewards_topic) ;
549
+ let stake_diffs_publisher = StakeDiffsPublisher :: new ( context. clone ( ) , stake_diffs_topic) ;
522
550
523
551
// Subscribe
524
552
let spos_subscription = context. subscribe ( & spo_state_topic) . await ?;
@@ -537,6 +565,7 @@ impl AccountsState {
537
565
drep_publisher,
538
566
spo_publisher,
539
567
spo_rewards_publisher,
568
+ stake_diffs_publisher,
540
569
spos_subscription,
541
570
ea_subscription,
542
571
certs_subscription,
0 commit comments