@@ -16,7 +16,7 @@ use config::Config;
16
16
use pallas:: ledger:: traverse:: MultiEraHeader ;
17
17
use std:: sync:: Arc ;
18
18
use tokio:: sync:: Mutex ;
19
- use tracing:: { error, info, info_span, Instrument } ;
19
+ use tracing:: { error, info, info_span} ;
20
20
21
21
mod epoch_activity_publisher;
22
22
mod epochs_history;
@@ -35,12 +35,13 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = (
35
35
) ;
36
36
const DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC : ( & str , & str ) =
37
37
( "block-header-subscribe-topic" , "cardano.block.header" ) ;
38
- const DEFAULT_BLOCK_FEES_SUBSCRIBE_TOPIC : ( & str , & str ) =
39
- ( "block-fees -subscribe-topic" , "cardano.block.fees " ) ;
38
+ const DEFAULT_BLOCK_TXS_SUBSCRIBE_TOPIC : ( & str , & str ) =
39
+ ( "block-txs -subscribe-topic" , "cardano.block.txs " ) ;
40
40
const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC : ( & str , & str ) = (
41
41
"protocol-parameters-subscribe-topic" ,
42
42
"cardano.protocol.parameters" ,
43
43
) ;
44
+
44
45
const DEFAULT_EPOCH_ACTIVITY_PUBLISH_TOPIC : ( & str , & str ) =
45
46
( "epoch-activity-publish-topic" , "cardano.epoch.activity" ) ;
46
47
@@ -59,7 +60,7 @@ impl EpochsState {
59
60
epochs_history : EpochsHistoryState ,
60
61
mut bootstrapped_subscription : Box < dyn Subscription < Message > > ,
61
62
mut headers_subscription : Box < dyn Subscription < Message > > ,
62
- mut fees_subscription : Box < dyn Subscription < Message > > ,
63
+ mut block_txs_subscription : Box < dyn Subscription < Message > > ,
63
64
mut protocol_parameters_subscription : Box < dyn Subscription < Message > > ,
64
65
mut epoch_activity_publisher : EpochActivityPublisher ,
65
66
) -> Result < ( ) > {
@@ -76,12 +77,12 @@ impl EpochsState {
76
77
77
78
loop {
78
79
// Get a mutable state
79
- let mut state = history. lock ( ) . await . get_or_init_with ( || State :: new ( ) ) ;
80
+ let mut state = history. lock ( ) . await . get_or_init_with ( || State :: new ( & genesis ) ) ;
80
81
let mut current_block: Option < BlockInfo > = None ;
81
82
82
83
// Read both topics in parallel
83
84
let headers_message_f = headers_subscription. read ( ) ;
84
- let fees_message_f = fees_subscription . read ( ) ;
85
+ let block_txs_message_f = block_txs_subscription . read ( ) ;
85
86
86
87
// Handle headers first
87
88
let ( _, message) = headers_message_f. await ?;
@@ -166,18 +167,16 @@ impl EpochsState {
166
167
_ => error ! ( "Unexpected message type: {message:?}" ) ,
167
168
}
168
169
169
- // Handle block fees second so new epoch's fees don't get counted in the last one
170
- let ( _, message) = fees_message_f . await ?;
170
+ // Handle block txs second so new epoch's state don't get counted in the last one
171
+ let ( _, message) = block_txs_message_f . await ?;
171
172
match message. as_ref ( ) {
172
- Message :: Cardano ( ( block_info, CardanoMessage :: BlockFees ( fees_msg ) ) ) => {
173
+ Message :: Cardano ( ( block_info, CardanoMessage :: BlockTxs ( txs_msg ) ) ) => {
173
174
let span =
174
- info_span ! ( "epochs_state.handle_block_fees " , block = block_info. number) ;
175
- async {
175
+ info_span ! ( "epochs_state.handle_block_txs " , block = block_info. number) ;
176
+ span . in_scope ( || {
176
177
Self :: check_sync ( & current_block, & block_info) ;
177
- state. handle_fees ( & block_info, fees_msg. total_fees ) ;
178
- }
179
- . instrument ( span)
180
- . await ;
178
+ state. handle_block_txs ( & block_info, txs_msg) ;
179
+ } ) ;
181
180
}
182
181
183
182
_ => error ! ( "Unexpected message type: {message:?}" ) ,
@@ -203,10 +202,10 @@ impl EpochsState {
203
202
. unwrap_or ( DEFAULT_BLOCK_HEADER_SUBSCRIBE_TOPIC . 1 . to_string ( ) ) ;
204
203
info ! ( "Creating subscriber for headers on '{block_headers_subscribe_topic}'" ) ;
205
204
206
- let block_fees_subscribe_topic = config
207
- . get_string ( DEFAULT_BLOCK_FEES_SUBSCRIBE_TOPIC . 0 )
208
- . unwrap_or ( DEFAULT_BLOCK_FEES_SUBSCRIBE_TOPIC . 1 . to_string ( ) ) ;
209
- info ! ( "Creating subscriber for fees on '{block_fees_subscribe_topic }'" ) ;
205
+ let block_txs_subscribe_topic = config
206
+ . get_string ( DEFAULT_BLOCK_TXS_SUBSCRIBE_TOPIC . 0 )
207
+ . unwrap_or ( DEFAULT_BLOCK_TXS_SUBSCRIBE_TOPIC . 1 . to_string ( ) ) ;
208
+ info ! ( "Creating subscriber for block txs on '{block_txs_subscribe_topic }'" ) ;
210
209
211
210
let protocol_parameters_subscribe_topic = config
212
211
. get_string ( DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC . 0 )
@@ -242,9 +241,9 @@ impl EpochsState {
242
241
// Subscribe
243
242
let bootstrapped_subscription = context. subscribe ( & bootstrapped_subscribe_topic) . await ?;
244
243
let headers_subscription = context. subscribe ( & block_headers_subscribe_topic) . await ?;
245
- let fees_subscription = context. subscribe ( & block_fees_subscribe_topic) . await ?;
246
244
let protocol_parameters_subscription =
247
245
context. subscribe ( & protocol_parameters_subscribe_topic) . await ?;
246
+ let block_txs_subscription = context. subscribe ( & block_txs_subscribe_topic) . await ?;
248
247
249
248
// Publisher
250
249
let epoch_activity_publisher =
@@ -306,7 +305,7 @@ impl EpochsState {
306
305
epochs_history,
307
306
bootstrapped_subscription,
308
307
headers_subscription,
309
- fees_subscription ,
308
+ block_txs_subscription ,
310
309
protocol_parameters_subscription,
311
310
epoch_activity_publisher,
312
311
)
0 commit comments