Skip to content

Commit cd3ad66

Browse files
authored
Merge pull request #148 from input-output-hk/whankinsiv/prepare-asset-state-module
feat: assets_state module with total supply storage
2 parents e8c4a9b + 40b7f62 commit cd3ad66

File tree

19 files changed

+481
-25
lines changed

19 files changed

+481
-25
lines changed

Cargo.lock

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ members = [
2020
"modules/stake_delta_filter", # Filters address deltas
2121
"modules/epoch_activity_counter", # Counts fees and block producers for rewards
2222
"modules/accounts_state", # Tracks stake and reward accounts
23+
"modules/assets_state", # Tracks native asset mints and burns
2324

2425
# Process builds
2526
"processes/omnibus", # All-inclusive omnibus process

common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ tracing = "0.1.40"
3333
futures = "0.3.31"
3434
minicbor = { version = "0.26.0", features = ["std", "half", "derive"] }
3535
num-traits = "0.2"
36+
imbl = { version = "5.0.0", features = ["serde"] }
3637

3738
[lib]
3839
crate-type = ["rlib"]

common/src/messages.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ pub struct UTXODeltasMessage {
6767
pub deltas: Vec<UTXODelta>,
6868
}
6969

70+
/// Message encapsulating multiple asset deltas
71+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
72+
pub struct AssetDeltasMessage {
73+
/// Ordered set of deltas
74+
pub deltas: NativeAssetsDelta,
75+
}
76+
7077
/// Message encapsulating multiple transaction certificates, in order
7178
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
7279
pub struct TxCertificatesMessage {
@@ -218,6 +225,7 @@ pub enum CardanoMessage {
218225
ReceivedTxs(RawTxsMessage), // Transaction available
219226
GenesisComplete(GenesisCompleteMessage), // Genesis UTXOs done + genesis params
220227
UTXODeltas(UTXODeltasMessage), // UTXO deltas received
228+
AssetDeltas(AssetDeltasMessage), // Asset mint and burn deltas
221229
TxCertificates(TxCertificatesMessage), // Transaction certificates received
222230
AddressDeltas(AddressDeltasMessage), // Address deltas received
223231
Withdrawals(WithdrawalsMessage), // Withdrawals from reward accounts

common/src/queries/assets.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
use crate::{AssetName, PolicyId};
2+
3+
pub const DEFAULT_ASSETS_QUERY_TOPIC: (&str, &str) =
4+
("assets-state-query-topic", "cardano.query.assets");
5+
16
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
27
pub enum AssetsStateQuery {
38
GetAssetsList,
@@ -10,7 +15,7 @@ pub enum AssetsStateQuery {
1015

1116
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1217
pub enum AssetsStateQueryResponse {
13-
AssetsList(AssetsList),
18+
AssetsList(imbl::HashMap<PolicyId, imbl::HashMap<AssetName, u64>>),
1419
AssetInfo(AssetInfo),
1520
AssetHistory(AssetHistory),
1621
AssetTransactions(AssetTransactions),

common/src/types.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,23 @@ pub struct StakeAddressDelta {
140140
pub delta: i64,
141141
}
142142

143+
pub type PolicyId = [u8; 28];
144+
pub type NativeAssets = Vec<(PolicyId, Vec<NativeAsset>)>;
145+
pub type NativeAssetsDelta = Vec<(PolicyId, Vec<NativeAssetDelta>)>;
146+
pub type AssetName = Vec<u8>;
147+
148+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
149+
pub struct NativeAsset {
150+
pub name: AssetName,
151+
pub amount: u64,
152+
}
153+
154+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
155+
pub struct NativeAssetDelta {
156+
pub name: AssetName,
157+
pub amount: i64,
158+
}
159+
143160
/// Value (lovelace + multiasset)
144161
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
145162
pub struct Value {
@@ -160,11 +177,11 @@ impl Value {
160177
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
161178
pub struct ValueDelta {
162179
pub lovelace: i64,
163-
pub assets: Vec<(PolicyId, Vec<NativeAssetDelta>)>,
180+
pub assets: NativeAssetsDelta,
164181
}
165182

166183
impl ValueDelta {
167-
pub fn new(lovelace: i64, assets: Vec<(PolicyId, Vec<NativeAssetDelta>)>) -> Self {
184+
pub fn new(lovelace: i64, assets: NativeAssetsDelta) -> Self {
168185
Self { lovelace, assets }
169186
}
170187
}
@@ -205,23 +222,6 @@ impl Neg for ValueDelta {
205222
}
206223
}
207224

208-
pub type PolicyId = [u8; 28];
209-
pub type NativeAssets = Vec<(PolicyId, Vec<NativeAsset>)>;
210-
pub type NativeAssetsDelta = Vec<(PolicyId, Vec<NativeAssetDelta>)>;
211-
pub type AssetName = Vec<u8>;
212-
213-
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
214-
pub struct NativeAsset {
215-
pub name: AssetName,
216-
pub amount: u64,
217-
}
218-
219-
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
220-
pub struct NativeAssetDelta {
221-
pub name: AssetName,
222-
pub amount: i64,
223-
}
224-
225225
/// Transaction output (UTXO)
226226
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
227227
pub struct TxOutput {

modules/assets_state/Cargo.toml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Acropolis reward state module
2+
3+
[package]
4+
name = "acropolis_module_assets_state"
5+
version = "0.1.0"
6+
edition = "2021"
7+
authors = ["William Hankins <[email protected]>"]
8+
description = "Native Asset State Tracker"
9+
license = "Apache-2.0"
10+
11+
[dependencies]
12+
caryatid_sdk = "0.12"
13+
acropolis_common = { path = "../../common" }
14+
config = "0.15.11"
15+
tokio = { version = "1", features = ["full"] }
16+
tracing = "0.1.40"
17+
anyhow = "1.0"
18+
imbl = { version = "5.0.0", features = ["serde"] }
19+
20+
[lib]
21+
path = "src/assets_state.rs"
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
//! Acropolis Asset State module for Caryatid
2+
//! Accepts native asset mint and burn events
3+
//! and derives the Asset State in memory
4+
5+
use acropolis_common::{
6+
messages::{CardanoMessage, Message, StateQuery, StateQueryResponse},
7+
queries::assets::{AssetsStateQuery, AssetsStateQueryResponse, DEFAULT_ASSETS_QUERY_TOPIC},
8+
state_history::{StateHistory, StateHistoryStore},
9+
BlockStatus,
10+
};
11+
use anyhow::Result;
12+
use caryatid_sdk::{module, Context, Module, Subscription};
13+
use config::Config;
14+
use std::sync::Arc;
15+
use tokio::sync::Mutex;
16+
use tracing::{error, info, info_span, Instrument};
17+
18+
use crate::state::{AssetsStorageConfig, State};
19+
mod state;
20+
21+
// Subscription topics
22+
const DEFAULT_ASSET_DELTAS_SUBSCRIBE_TOPIC: (&str, &str) =
23+
("asset-deltas-subscribe-topic", "cardano.asset.deltas");
24+
25+
// Configuration defaults
26+
const DEFAULT_STORE_INFO: (&str, bool) = ("store-info", false);
27+
const DEFAULT_STORE_HISTORY: (&str, bool) = ("store-history", false);
28+
const DEFAULT_STORE_TRANSACTIONS: (&str, bool) = ("store-transactions", false);
29+
const DEFAULT_STORE_ADDRESSES: (&str, bool) = ("store-addresses", false);
30+
31+
/// Assets State module
32+
#[module(
33+
message_type(Message),
34+
name = "assets-state",
35+
description = "In-memory Assets State from asset mint and burn events"
36+
)]
37+
pub struct AssetsState;
38+
39+
impl AssetsState {
40+
async fn run(
41+
history: Arc<Mutex<StateHistory<State>>>,
42+
mut deltas_subscription: Box<dyn Subscription<Message>>,
43+
storage_config: AssetsStorageConfig,
44+
) -> Result<()> {
45+
// Main loop of synchronised messages
46+
loop {
47+
match deltas_subscription.read().await?.1.as_ref() {
48+
Message::Cardano((block, CardanoMessage::AssetDeltas(message))) => {
49+
let span = info_span!("assets_state.handle", epoch = block.epoch);
50+
async {
51+
// Get current state and current params
52+
let mut state = {
53+
let mut h = history.lock().await;
54+
h.get_or_init_with(|| State::new(&storage_config))
55+
};
56+
57+
// Handle rollback if needed
58+
if block.status == BlockStatus::RolledBack {
59+
state = history.lock().await.get_rolled_back_state(block.epoch);
60+
}
61+
62+
// Process deltas
63+
state = match state.handle_deltas(&message.deltas) {
64+
Ok(new_state) => new_state,
65+
Err(e) => {
66+
error!("Asset deltas handling error: {e:#}");
67+
state
68+
}
69+
};
70+
71+
// Commit state
72+
{
73+
let mut h = history.lock().await;
74+
h.commit(block.epoch, state);
75+
}
76+
77+
Ok::<(), anyhow::Error>(())
78+
}
79+
.instrument(span)
80+
.await?;
81+
}
82+
msg => error!("Unexpected message {msg:?} for enact state topic"),
83+
}
84+
}
85+
}
86+
87+
pub async fn init(&self, context: Arc<Context<Message>>, config: Arc<Config>) -> Result<()> {
88+
fn get_bool_flag(config: &Config, key: (&str, bool)) -> bool {
89+
config.get_bool(key.0).unwrap_or(key.1)
90+
}
91+
92+
fn get_string_flag(config: &Config, key: (&str, &str)) -> String {
93+
config.get_string(key.0).unwrap_or_else(|_| key.1.to_string())
94+
}
95+
96+
// Get configuration flags and topis
97+
let storage_config = AssetsStorageConfig {
98+
_store_info: get_bool_flag(&config, DEFAULT_STORE_INFO),
99+
_store_history: get_bool_flag(&config, DEFAULT_STORE_HISTORY),
100+
_store_transactions: get_bool_flag(&config, DEFAULT_STORE_TRANSACTIONS),
101+
_store_addresses: get_bool_flag(&config, DEFAULT_STORE_ADDRESSES),
102+
};
103+
104+
let asset_deltas_subscribe_topic =
105+
get_string_flag(&config, DEFAULT_ASSET_DELTAS_SUBSCRIBE_TOPIC);
106+
info!("Creating subscriber on '{asset_deltas_subscribe_topic}'");
107+
108+
let assets_query_topic = get_string_flag(&config, DEFAULT_ASSETS_QUERY_TOPIC);
109+
info!("Creating asset query handler on '{assets_query_topic}'");
110+
111+
// Initalize state history
112+
let history = Arc::new(Mutex::new(StateHistory::<State>::new(
113+
"AssetsState",
114+
StateHistoryStore::default_block_store(),
115+
)));
116+
let history_run = history.clone();
117+
let query_history = history.clone();
118+
let ticker_history = history.clone();
119+
120+
// Query handler
121+
context.handle(&assets_query_topic, move |message| {
122+
let history = query_history.clone();
123+
async move {
124+
let Message::StateQuery(StateQuery::Assets(query)) = message.as_ref() else {
125+
return Arc::new(Message::StateQueryResponse(StateQueryResponse::Assets(
126+
AssetsStateQueryResponse::Error("Invalid message for assets-state".into()),
127+
)));
128+
};
129+
130+
let state = history.lock().await.get_current_state();
131+
132+
let response = match query {
133+
AssetsStateQuery::GetAssetsList => {
134+
AssetsStateQueryResponse::AssetsList(state.assets)
135+
}
136+
_ => AssetsStateQueryResponse::Error(format!(
137+
"Unimplemented assets query: {query:?}"
138+
)),
139+
};
140+
Arc::new(Message::StateQueryResponse(StateQueryResponse::Assets(
141+
response,
142+
)))
143+
}
144+
});
145+
146+
// Ticker to log stats
147+
let mut subscription = context.subscribe("clock.tick").await?;
148+
context.run(async move {
149+
loop {
150+
let Ok((_, message)) = subscription.read().await else {
151+
return;
152+
};
153+
if let Message::Clock(message) = message.as_ref() {
154+
if (message.number % 60) == 0 {
155+
let span = info_span!("assets_state.tick", number = message.number);
156+
async {
157+
ticker_history
158+
.lock()
159+
.await
160+
.get_current_state()
161+
.tick()
162+
.await
163+
.inspect_err(|e| error!("Tick error: {e}"))
164+
.ok();
165+
}
166+
.instrument(span)
167+
.await;
168+
}
169+
}
170+
}
171+
});
172+
173+
// Subscribe to enabled topics
174+
let deltas_sub = context.subscribe(&asset_deltas_subscribe_topic).await?;
175+
176+
// Start run task
177+
context.run(async move {
178+
Self::run(history_run, deltas_sub, storage_config)
179+
.await
180+
.unwrap_or_else(|e| error!("Failed: {e}"));
181+
});
182+
183+
Ok(())
184+
}
185+
}

0 commit comments

Comments
 (0)