4
4
5
5
use crate :: {
6
6
asset_registry:: AssetRegistry ,
7
- state:: { AssetsStorageConfig , State } ,
7
+ state:: { AssetsStorageConfig , State , StoreTransactions } ,
8
8
} ;
9
9
use acropolis_common:: {
10
10
messages:: { CardanoMessage , Message , StateQuery , StateQueryResponse } ,
@@ -26,12 +26,14 @@ const DEFAULT_ASSET_DELTAS_SUBSCRIBE_TOPIC: (&str, &str) =
26
26
( "asset-deltas-subscribe-topic" , "cardano.asset.deltas" ) ;
27
27
const DEFAULT_UTXO_DELTAS_SUBSCRIBE_TOPIC : ( & str , & str ) =
28
28
( "utxo-deltas-subscribe-topic" , "cardano.utxo.deltas" ) ;
29
+ const DEFAULT_ADDRESS_DELTAS_SUBSCRIBE_TOPIC : ( & str , & str ) =
30
+ ( "address-deltas-subscribe-topic" , "cardano.address.delta" ) ;
29
31
30
32
// Configuration defaults
31
33
const DEFAULT_STORE_ASSETS : ( & str , bool ) = ( "store-assets" , false ) ;
32
34
const DEFAULT_STORE_INFO : ( & str , bool ) = ( "store-info" , false ) ;
33
35
const DEFAULT_STORE_HISTORY : ( & str , bool ) = ( "store-history" , false ) ;
34
- const DEFAULT_STORE_TRANSACTIONS : ( & str , bool ) = ( "store-transactions" , false ) ;
36
+ const DEFAULT_STORE_TRANSACTIONS : ( & str , & str ) = ( "store-transactions" , "none" ) ;
35
37
const DEFAULT_STORE_ADDRESSES : ( & str , bool ) = ( "store-addresses" , false ) ;
36
38
const DEFAULT_INDEX_BY_POLICY : ( & str , bool ) = ( "index-by-policy" , false ) ;
37
39
@@ -48,13 +50,18 @@ impl AssetsState {
48
50
history : Arc < Mutex < StateHistory < State > > > ,
49
51
mut asset_deltas_subscription : Box < dyn Subscription < Message > > ,
50
52
mut utxo_deltas_subscription : Option < Box < dyn Subscription < Message > > > ,
53
+ mut address_deltas_subscription : Option < Box < dyn Subscription < Message > > > ,
51
54
storage_config : AssetsStorageConfig ,
52
55
registry : Arc < Mutex < AssetRegistry > > ,
53
56
) -> Result < ( ) > {
54
57
if let Some ( sub) = utxo_deltas_subscription. as_mut ( ) {
55
58
let _ = sub. read ( ) . await ?;
56
59
info ! ( "Consumed initial message from utxo_deltas_subscription" ) ;
57
60
}
61
+ if let Some ( sub) = address_deltas_subscription. as_mut ( ) {
62
+ let _ = sub. read ( ) . await ?;
63
+ info ! ( "Consumed initial message from address_deltas_subscription" ) ;
64
+ }
58
65
// Main loop of synchronised messages
59
66
loop {
60
67
// Get current state snapshot
@@ -115,20 +122,58 @@ impl AssetsState {
115
122
CardanoMessage :: UTXODeltas ( utxo_deltas_msg) ,
116
123
) ) => {
117
124
Self :: check_sync ( & current_block, block_info, "utxo" ) ;
118
- let mut reg = registry. lock ( ) . await ;
119
- state =
120
- match state. handle_cip68_metadata ( & utxo_deltas_msg. deltas , & mut * reg) {
125
+
126
+ if storage_config. store_info {
127
+ let reg = registry. lock ( ) . await ;
128
+ state =
129
+ match state. handle_cip68_metadata ( & utxo_deltas_msg. deltas , & * reg) {
130
+ Ok ( new_state) => new_state,
131
+ Err ( e) => {
132
+ error ! ( "CIP-68 metadata handling error: {e:#}" ) ;
133
+ state
134
+ }
135
+ } ;
136
+ }
137
+
138
+ if storage_config. store_transactions . is_enabled ( ) {
139
+ let reg = registry. lock ( ) . await ;
140
+ state = match state. handle_transactions ( & utxo_deltas_msg. deltas , & * reg)
141
+ {
121
142
Ok ( new_state) => new_state,
122
143
Err ( e) => {
123
- error ! ( "CIP-68 metadata handling error: {e:#}" ) ;
144
+ error ! ( "Transactions handling error: {e:#}" ) ;
124
145
state
125
146
}
126
147
} ;
148
+ }
127
149
}
128
150
other => error ! ( "Unexpected message on utxo-deltas subscription: {other:?}" ) ,
129
151
}
130
152
}
131
153
154
+ if let Some ( sub) = address_deltas_subscription. as_mut ( ) {
155
+ let ( _, address_msg) = sub. read ( ) . await ?;
156
+ match address_msg. as_ref ( ) {
157
+ Message :: Cardano ( (
158
+ ref block_info,
159
+ CardanoMessage :: AddressDeltas ( address_deltas_msg) ,
160
+ ) ) => {
161
+ Self :: check_sync ( & current_block, block_info, "address" ) ;
162
+
163
+ let reg = registry. lock ( ) . await ;
164
+ state = match state. handle_address_deltas ( & address_deltas_msg. deltas , & * reg)
165
+ {
166
+ Ok ( new_state) => new_state,
167
+ Err ( e) => {
168
+ error ! ( "Address deltas handling error: {e:#}" ) ;
169
+ state
170
+ }
171
+ } ;
172
+ }
173
+ other => error ! ( "Unexpected message on address-deltas subscription: {other:?}" ) ,
174
+ }
175
+ }
176
+
132
177
// Commit state
133
178
{
134
179
let mut h = history. lock ( ) . await ;
@@ -165,12 +210,27 @@ impl AssetsState {
165
210
config. get_string ( key. 0 ) . unwrap_or_else ( |_| key. 1 . to_string ( ) )
166
211
}
167
212
213
+ fn get_transactions_flag ( config : & Config , key : ( & str , & str ) ) -> StoreTransactions {
214
+ let val = get_string_flag ( config, key) ;
215
+ match val. as_str ( ) {
216
+ "none" => StoreTransactions :: None ,
217
+ "all" => StoreTransactions :: All ,
218
+ s => {
219
+ if let Ok ( n) = s. parse :: < u64 > ( ) {
220
+ StoreTransactions :: Last ( n)
221
+ } else {
222
+ StoreTransactions :: None
223
+ }
224
+ }
225
+ }
226
+ }
227
+
168
228
// Get configuration flags and topis
169
229
let storage_config = AssetsStorageConfig {
170
230
store_assets : get_bool_flag ( & config, DEFAULT_STORE_ASSETS ) ,
171
231
store_info : get_bool_flag ( & config, DEFAULT_STORE_INFO ) ,
172
232
store_history : get_bool_flag ( & config, DEFAULT_STORE_HISTORY ) ,
173
- store_transactions : get_bool_flag ( & config, DEFAULT_STORE_TRANSACTIONS ) ,
233
+ store_transactions : get_transactions_flag ( & config, DEFAULT_STORE_TRANSACTIONS ) ,
174
234
store_addresses : get_bool_flag ( & config, DEFAULT_STORE_ADDRESSES ) ,
175
235
index_by_policy : get_bool_flag ( & config, DEFAULT_INDEX_BY_POLICY ) ,
176
236
} ;
@@ -180,13 +240,22 @@ impl AssetsState {
180
240
info ! ( "Creating subscriber on '{asset_deltas_subscribe_topic}'" ) ;
181
241
182
242
let utxo_deltas_subscribe_topic: Option < String > =
183
- if storage_config. store_info || storage_config. store_transactions {
243
+ if storage_config. store_info || storage_config. store_transactions . is_enabled ( ) {
184
244
let topic = get_string_flag ( & config, DEFAULT_UTXO_DELTAS_SUBSCRIBE_TOPIC ) ;
185
245
info ! ( "Creating subscriber on '{topic}'" ) ;
186
246
Some ( topic)
187
247
} else {
188
248
None
189
249
} ;
250
+
251
+ let address_deltas_subscribe_topic: Option < String > = if storage_config. store_addresses {
252
+ let topic = get_string_flag ( & config, DEFAULT_ADDRESS_DELTAS_SUBSCRIBE_TOPIC ) ;
253
+ info ! ( "Creating subscriber on '{topic}'" ) ;
254
+ Some ( topic)
255
+ } else {
256
+ None
257
+ } ;
258
+
190
259
let assets_query_topic = get_string_flag ( & config, DEFAULT_ASSETS_QUERY_TOPIC ) ;
191
260
info ! ( "Creating asset query handler on '{assets_query_topic}'" ) ;
192
261
@@ -215,7 +284,10 @@ impl AssetsState {
215
284
) ) ) ;
216
285
} ;
217
286
218
- let state = history. lock ( ) . await . get_current_state ( ) ;
287
+ let state = {
288
+ let h = history. lock ( ) . await ;
289
+ h. get_current_state ( )
290
+ } ;
219
291
220
292
let response = match query {
221
293
AssetsStateQuery :: GetAssetsList => {
@@ -295,7 +367,7 @@ impl AssetsState {
295
367
Err ( e) => AssetsStateQueryResponse :: Error ( e. to_string ( ) ) ,
296
368
} ,
297
369
None => {
298
- if state. config . store_transactions {
370
+ if state. config . store_transactions . is_enabled ( ) {
299
371
AssetsStateQueryResponse :: NotFound
300
372
} else {
301
373
AssetsStateQueryResponse :: Error (
@@ -354,13 +426,19 @@ impl AssetsState {
354
426
} else {
355
427
None
356
428
} ;
429
+ let address_deltas_sub = if let Some ( topic) = & address_deltas_subscribe_topic {
430
+ Some ( context. subscribe ( topic) . await ?)
431
+ } else {
432
+ None
433
+ } ;
357
434
358
435
// Start run task
359
436
context. run ( async move {
360
437
Self :: run (
361
438
history_run,
362
439
asset_deltas_sub,
363
440
utxo_deltas_sub,
441
+ address_deltas_sub,
364
442
storage_config,
365
443
registry_run,
366
444
)
0 commit comments