From 980c381a5a26d694fece73fd373c3778b4fa9291 Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Fri, 21 Jun 2024 13:29:00 +0300 Subject: [PATCH] save state changes in the new database crate --- Cargo.lock | 80 +++--- database-new/Cargo.toml | 2 +- database-new/src/postgres/state_indexer.rs | 272 ++++++++++++++++++++- 3 files changed, 308 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f502df86..7add1684 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1225,11 +1225,11 @@ dependencies = [ [[package]] name = "borsh" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbe5b10e214954177fb1dc9fbd20a1a2608fe99e6c832033bdc7cea287a20d77" +checksum = "a6362ed55def622cddc70a4746a68554d7b687713770de539e59a739b249f8ed" dependencies = [ - "borsh-derive 1.5.0", + "borsh-derive 1.5.1", "cfg_aliases", ] @@ -1248,9 +1248,9 @@ dependencies = [ [[package]] name = "borsh-derive" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a8646f94ab393e43e8b35a2558b1624bed28b97ee09c5d15456e3c9463f46d" +checksum = "c3ef8005764f53cd4dca619f5bf64cafd4664dada50ece25e4d81de54c80cc0b" dependencies = [ "once_cell", "proc-macro-crate 3.1.0", @@ -1438,9 +1438,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "cfg_aliases" -version = "0.1.1" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" @@ -2095,7 +2095,7 @@ dependencies = [ "anyhow", "async-trait", "bigdecimal 0.4.5", - "borsh 1.5.0", + "borsh 1.5.1", "bytes", "configuration", "diesel", @@ -2124,20 +2124,20 @@ dependencies = [ [[package]] name = "database-new" -version = "0.2.9" +version = "0.2.10" dependencies = [ "anyhow", "async-trait", "bigdecimal 0.3.1", - "borsh 1.5.0", + "borsh 1.5.1", "configuration", "futures", "hex", "lazy_static", - "near-chain-configs 1.39.1", - "near-crypto 1.39.1", + "near-chain-configs 1.40.0", + "near-crypto 1.40.0", "near-indexer-primitives", - "near-primitives 1.39.1", + "near-primitives 1.40.0", "prometheus", "readnode-primitives", "serde_json", @@ -4045,7 +4045,7 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35cbb989542587b47205e608324ddd391f0cee1c22b4b64ae49f458334b95907" dependencies = [ - "borsh 1.5.0", + "borsh 1.5.1", "serde", ] @@ -4092,7 +4092,7 @@ source = "git+https://github.com/kobayurii/nearcore.git?branch=1.40.0-fork#866de dependencies = [ "actix", "assert_matches", - "borsh 1.5.0", + "borsh 1.5.1", "bytesize", "chrono", "crossbeam-channel", @@ -4198,7 +4198,7 @@ version = "1.40.0" source = "git+https://github.com/kobayurii/nearcore.git?branch=1.40.0-fork#866de3aab2f87bd4d947d6954fb10f29adb5cb1a" dependencies = [ "actix", - "borsh 1.5.0", + "borsh 1.5.1", "chrono", "derive_more", "futures", @@ -4243,7 +4243,7 @@ dependencies = [ "actix-rt", "anyhow", "async-trait", - "borsh 1.5.0", + "borsh 1.5.1", "bytesize", "chrono", "cloud-storage", @@ -4343,7 +4343,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2991d2912218a80ec0733ac87f84fa803accea105611eea209d4419271957667" dependencies = [ "blake2", - "borsh 1.5.0", + "borsh 1.5.1", "bs58", "c2-chacha", "curve25519-dalek", @@ -4369,7 +4369,7 @@ version = "1.40.0" source = "git+https://github.com/kobayurii/nearcore.git?branch=1.40.0-fork#866de3aab2f87bd4d947d6954fb10f29adb5cb1a" dependencies = [ "blake2", - "borsh 1.5.0", + "borsh 1.5.1", "bs58", "curve25519-dalek", "derive_more", @@ -4411,7 +4411,7 @@ name = "near-epoch-manager" version = "1.40.0" source = "git+https://github.com/kobayurii/nearcore.git?branch=1.40.0-fork#866de3aab2f87bd4d947d6954fb10f29adb5cb1a" dependencies = [ - "borsh 1.5.0", + "borsh 1.5.1", "itertools 0.10.5", "near-cache", "near-chain-configs 1.40.0", @@ -4523,7 +4523,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18ad81e015f7aced8925d5b9ba3f369b36da9575c15812cfd0786bc1213284ca" dependencies = [ - "borsh 1.5.0", + "borsh 1.5.1", "lazy_static", "log", "near-chain-configs 0.20.1", @@ -4541,7 +4541,7 @@ name = "near-jsonrpc-client" version = "0.9.0" source = "git+https://github.com/kobayurii/near-jsonrpc-client-rs.git?branch=0.10.0#77ece899657045362e7dbfcad3c409e2ee9a5ef5" dependencies = [ - "borsh 1.5.0", + "borsh 1.5.1", "lazy_static", "log", "near-chain-configs 1.40.0", @@ -4645,7 +4645,7 @@ dependencies = [ "anyhow", "arc-swap", "async-trait", - "borsh 1.5.0", + "borsh 1.5.1", "bytes", "bytesize", "chrono", @@ -4746,7 +4746,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9f16a59b6c3e69b0585be951af6fe42a0ba86c0e207cb8c63badd19efd16680" dependencies = [ "assert_matches", - "borsh 1.5.0", + "borsh 1.5.1", "enum-map", "near-account-id", "near-primitives-core 0.20.1", @@ -4763,7 +4763,7 @@ name = "near-parameters" version = "1.40.0" source = "git+https://github.com/kobayurii/nearcore.git?branch=1.40.0-fork#866de3aab2f87bd4d947d6954fb10f29adb5cb1a" dependencies = [ - "borsh 1.5.0", + "borsh 1.5.1", "enum-map", "near-account-id", "near-primitives-core 1.40.0", @@ -4805,7 +4805,7 @@ name = "near-pool" version = "1.40.0" source = "git+https://github.com/kobayurii/nearcore.git?branch=1.40.0-fork#866de3aab2f87bd4d947d6954fb10f29adb5cb1a" dependencies = [ - "borsh 1.5.0", + "borsh 1.5.1", "near-crypto 1.40.0", "near-o11y 1.40.0", "near-primitives 1.40.0", @@ -4821,7 +4821,7 @@ checksum = "0462b067732132babcc89d5577db3bfcb0a1bcfbaaed3f2db4c11cd033666314" dependencies = [ "arbitrary", "base64 0.21.7", - "borsh 1.5.0", + "borsh 1.5.1", "bytesize", "cfg-if 1.0.0", "chrono", @@ -4862,7 +4862,7 @@ source = "git+https://github.com/kobayurii/nearcore.git?branch=1.40.0-fork#866de dependencies = [ "arbitrary", "base64 0.21.7", - "borsh 1.5.0", + "borsh 1.5.1", "bytes", "bytesize", "cfg-if 1.0.0", @@ -4906,7 +4906,7 @@ checksum = "8443eb718606f572c438be6321a097a8ebd69f8e48d953885b4f16601af88225" dependencies = [ "arbitrary", "base64 0.21.7", - "borsh 1.5.0", + "borsh 1.5.1", "bs58", "derive_more", "enum-map", @@ -4927,7 +4927,7 @@ source = "git+https://github.com/kobayurii/nearcore.git?branch=1.40.0-fork#866de dependencies = [ "arbitrary", "base64 0.21.7", - "borsh 1.5.0", + "borsh 1.5.1", "bs58", "derive_more", "enum-map", @@ -5020,7 +5020,7 @@ dependencies = [ "actix", "actix-web", "anyhow", - "borsh 1.5.0", + "borsh 1.5.1", "clap", "configuration", "database", @@ -5060,7 +5060,7 @@ dependencies = [ "actix", "actix-rt", "anyhow", - "borsh 1.5.0", + "borsh 1.5.1", "bytesize", "crossbeam", "derive_more", @@ -5180,7 +5180,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c56c80bdb1954808f59bd36a9112377197b38d424991383bf05f52d0fe2e0da5" dependencies = [ "base64 0.21.7", - "borsh 1.5.0", + "borsh 1.5.1", "ed25519-dalek", "enum-map", "memoffset 0.8.0", @@ -5210,7 +5210,7 @@ source = "git+https://github.com/kobayurii/nearcore.git?branch=1.40.0-fork#866de dependencies = [ "anyhow", "base64 0.21.7", - "borsh 1.5.0", + "borsh 1.5.1", "ed25519-dalek", "enum-map", "finite-wasm", @@ -5307,7 +5307,7 @@ dependencies = [ "actix-web", "anyhow", "awc", - "borsh 1.5.0", + "borsh 1.5.1", "bytesize", "chrono", "cloud-storage", @@ -5392,7 +5392,7 @@ name = "node-runtime" version = "1.40.0" source = "git+https://github.com/kobayurii/nearcore.git?branch=1.40.0-fork#866de3aab2f87bd4d947d6954fb10f29adb5cb1a" dependencies = [ - "borsh 1.5.0", + "borsh 1.5.1", "hex", "near-chain-configs 1.40.0", "near-crypto 1.40.0", @@ -6735,7 +6735,7 @@ dependencies = [ "actix-web", "anyhow", "assert-json-diff", - "borsh 1.5.0", + "borsh 1.5.1", "chrono", "configuration", "database-new", @@ -6776,7 +6776,7 @@ name = "readnode-primitives" version = "0.2.10" dependencies = [ "anyhow", - "borsh 1.5.0", + "borsh 1.5.1", "near-chain-configs 1.40.0", "near-indexer-primitives", "num-traits", @@ -8053,7 +8053,7 @@ version = "0.2.10" dependencies = [ "actix-web", "anyhow", - "borsh 1.5.0", + "borsh 1.5.1", "clap", "configuration", "database-new", @@ -8941,7 +8941,7 @@ version = "0.2.10" dependencies = [ "actix-web", "anyhow", - "borsh 1.5.0", + "borsh 1.5.1", "clap", "configuration", "database", diff --git a/database-new/Cargo.toml b/database-new/Cargo.toml index c5099ea8..b4295eab 100644 --- a/database-new/Cargo.toml +++ b/database-new/Cargo.toml @@ -13,7 +13,7 @@ license.workspace = true anyhow = "1.0.86" async-trait = "0.1.66" bigdecimal = "0.3.0" # Dependency of sqlx-core and sqlx-postgres is version 0.3.0 -borsh = "1.3.1" +borsh = "1.5.1" futures = "0.3.5" hex = "0.4.3" lazy_static = "1.4.0" diff --git a/database-new/src/postgres/state_indexer.rs b/database-new/src/postgres/state_indexer.rs index 20750277..fa633afc 100644 --- a/database-new/src/postgres/state_indexer.rs +++ b/database-new/src/postgres/state_indexer.rs @@ -50,6 +50,156 @@ impl crate::PostgresDBManager { .await?; Ok(()) } + + async fn save_state_changes_access_key_to_shard( + &self, + shard_id: near_primitives::types::ShardId, + state_changes: Vec, + block_height: u64, + block_hash: near_indexer_primitives::CryptoHash, + ) -> anyhow::Result<()> { + let mut query_builder: sqlx::QueryBuilder = sqlx::QueryBuilder::new( + "INSERT INTO state_changes_access_key (account_id, block_height, block_hash, data_key, data_value) ", + ); + query_builder.push_values(state_changes.iter(), |mut values, state_change| { + match &state_change.value { + near_primitives::views::StateChangeValueView::AccessKeyUpdate { + account_id, + public_key, + access_key, + } => { + let data_key = + borsh::to_vec(public_key).expect("Failed to borsh serialize public key"); + let data_value = + borsh::to_vec(access_key).expect("Failed to borsh serialize access key"); + values + .push_bind(account_id.to_string()) + .push_bind(bigdecimal::BigDecimal::from(block_height)) + .push_bind(block_hash.to_string()) + .push_bind(hex::encode(&data_key).to_string()) + .push_bind(data_value); + } + near_primitives::views::StateChangeValueView::AccessKeyDeletion { + account_id, + public_key, + } => { + let data_key = + borsh::to_vec(public_key).expect("Failed to borsh serialize public key"); + let data_value: Option<&[u8]> = None; + values + .push_bind(account_id.to_string()) + .push_bind(bigdecimal::BigDecimal::from(block_height)) + .push_bind(block_hash.to_string()) + .push_bind(hex::encode(data_key).to_string()) + .push_bind(data_value); + } + _ => {} + } + }); + query_builder + .build() + .execute( + self.shards_pool + .get(&shard_id) + .ok_or(anyhow::anyhow!("Shard not found"))?, + ) + .await?; + Ok(()) + } + + async fn save_state_changes_contract_to_shard( + &self, + shard_id: near_primitives::types::ShardId, + state_changes: Vec, + block_height: u64, + block_hash: near_indexer_primitives::CryptoHash, + ) -> anyhow::Result<()> { + let mut query_builder: sqlx::QueryBuilder = sqlx::QueryBuilder::new( + "INSERT INTO state_changes_contract (account_id, block_height, block_hash, data_value) ", + ); + query_builder.push_values(state_changes.iter(), |mut values, state_change| { + match &state_change.value { + near_primitives::views::StateChangeValueView::ContractCodeUpdate { + account_id, + code, + } => { + let data_value: &[u8] = code.as_ref(); + values + .push_bind(account_id.to_string()) + .push_bind(bigdecimal::BigDecimal::from(block_height)) + .push_bind(block_hash.to_string()) + .push_bind(data_value); + } + near_primitives::views::StateChangeValueView::ContractCodeDeletion { + account_id, + } => { + let data_value: Option<&[u8]> = None; + values + .push_bind(account_id.to_string()) + .push_bind(bigdecimal::BigDecimal::from(block_height)) + .push_bind(block_hash.to_string()) + .push_bind(data_value); + } + _ => {} + } + }); + query_builder + .build() + .execute( + self.shards_pool + .get(&shard_id) + .ok_or(anyhow::anyhow!("Shard not found"))?, + ) + .await?; + Ok(()) + } + + async fn save_state_changes_account_to_shard( + &self, + shard_id: near_primitives::types::ShardId, + state_changes: Vec, + block_height: u64, + block_hash: near_indexer_primitives::CryptoHash, + ) -> anyhow::Result<()> { + let mut query_builder: sqlx::QueryBuilder = sqlx::QueryBuilder::new( + "INSERT INTO state_changes_account (account_id, block_height, block_hash, data_value) ", + ); + query_builder.push_values(state_changes.iter(), |mut values, state_change| { + match &state_change.value { + near_primitives::views::StateChangeValueView::AccountUpdate { + account_id, + account, + } => { + let data_value = + borsh::to_vec(&near_primitives::account::Account::from(account)) + .expect("Failed to borsh serialize account"); + values + .push_bind(account_id.to_string()) + .push_bind(bigdecimal::BigDecimal::from(block_height)) + .push_bind(block_hash.to_string()) + .push_bind(data_value); + } + near_primitives::views::StateChangeValueView::AccountDeletion { account_id } => { + let data_value: Option<&[u8]> = None; + values + .push_bind(account_id.to_string()) + .push_bind(bigdecimal::BigDecimal::from(block_height)) + .push_bind(block_hash.to_string()) + .push_bind(data_value); + } + _ => {} + } + }); + query_builder + .build() + .execute( + self.shards_pool + .get(&shard_id) + .ok_or(anyhow::anyhow!("Shard not found"))?, + ) + .await?; + Ok(()) + } } #[async_trait::async_trait] @@ -247,8 +397,10 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager { > = std::collections::HashMap::new(); for state_change in state_changes { match &state_change.value { - near_primitives::views::StateChangeValueView::DataUpdate { account_id, .. } | - near_primitives::views::StateChangeValueView::DataDeletion { account_id, .. } => { + near_primitives::views::StateChangeValueView::DataUpdate { account_id, .. } + | near_primitives::views::StateChangeValueView::DataDeletion { + account_id, .. + } => { let shard_id = near_primitives::shard_layout::account_id_to_shard_id( account_id, &self.shard_layout, @@ -276,7 +428,44 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager { block_height: u64, block_hash: near_indexer_primitives::CryptoHash, ) -> anyhow::Result<()> { - todo!() + let mut state_changes_by_shards: std::collections::HashMap< + u64, + Vec, + > = std::collections::HashMap::new(); + for state_change in state_changes { + match &state_change.value { + near_primitives::views::StateChangeValueView::AccessKeyUpdate { + account_id, + .. + } + | near_primitives::views::StateChangeValueView::AccessKeyDeletion { + account_id, + .. + } => { + let shard_id = near_primitives::shard_layout::account_id_to_shard_id( + account_id, + &self.shard_layout, + ); + state_changes_by_shards + .entry(shard_id) + .or_insert_with(Vec::new) + .push(state_change); + } + _ => {} + } + } + let futures = state_changes_by_shards + .into_iter() + .map(|(shard_id, changes)| { + self.save_state_changes_access_key_to_shard( + shard_id, + changes, + block_height, + block_hash, + ) + }); + futures::future::try_join_all(futures).await?; + Ok(()) } async fn save_state_changes_contract( @@ -285,7 +474,44 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager { block_height: u64, block_hash: near_indexer_primitives::CryptoHash, ) -> anyhow::Result<()> { - todo!() + let mut state_changes_by_shards: std::collections::HashMap< + u64, + Vec, + > = std::collections::HashMap::new(); + for state_change in state_changes { + match &state_change.value { + near_primitives::views::StateChangeValueView::ContractCodeUpdate { + account_id, + .. + } + | near_primitives::views::StateChangeValueView::ContractCodeDeletion { + account_id, + .. + } => { + let shard_id = near_primitives::shard_layout::account_id_to_shard_id( + account_id, + &self.shard_layout, + ); + state_changes_by_shards + .entry(shard_id) + .or_insert_with(Vec::new) + .push(state_change); + } + _ => {} + } + } + let futures = state_changes_by_shards + .into_iter() + .map(|(shard_id, changes)| { + self.save_state_changes_contract_to_shard( + shard_id, + changes, + block_height, + block_hash, + ) + }); + futures::future::try_join_all(futures).await?; + Ok(()) } async fn save_state_changes_account( @@ -294,6 +520,42 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager { block_height: u64, block_hash: near_indexer_primitives::CryptoHash, ) -> anyhow::Result<()> { - todo!() + let mut state_changes_by_shards: std::collections::HashMap< + u64, + Vec, + > = std::collections::HashMap::new(); + for state_change in state_changes { + match &state_change.value { + near_primitives::views::StateChangeValueView::AccountUpdate { + account_id, .. + } + | near_primitives::views::StateChangeValueView::AccountDeletion { + account_id, + .. + } => { + let shard_id = near_primitives::shard_layout::account_id_to_shard_id( + account_id, + &self.shard_layout, + ); + state_changes_by_shards + .entry(shard_id) + .or_insert_with(Vec::new) + .push(state_change); + } + _ => {} + } + } + let futures = state_changes_by_shards + .into_iter() + .map(|(shard_id, changes)| { + self.save_state_changes_account_to_shard( + shard_id, + changes, + block_height, + block_hash, + ) + }); + futures::future::try_join_all(futures).await?; + Ok(()) } }