From 7fce8efb7d4ddc294cde5471e4719567954d8f0e Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Tue, 25 Jun 2024 09:48:42 +0300 Subject: [PATCH] improvement and optimisation --- database-new/src/base/rpc_server.rs | 7 -- database-new/src/base/state_indexer.rs | 13 --- .../meta_db/20240619122029_init.down.sql | 1 - .../meta_db/20240619122029_init.up.sql | 11 --- database-new/src/postgres/rpc_server.rs | 79 +++++++---------- database-new/src/postgres/state_indexer.rs | 84 ++++--------------- tx-indexer/src/main.rs | 3 +- 7 files changed, 49 insertions(+), 149 deletions(-) diff --git a/database-new/src/base/rpc_server.rs b/database-new/src/base/rpc_server.rs index 9e6b5fbb..64e096ad 100644 --- a/database-new/src/base/rpc_server.rs +++ b/database-new/src/base/rpc_server.rs @@ -132,11 +132,4 @@ pub trait ReaderDbManager { block_height: near_primitives::types::BlockHeight, method_name: &str, ) -> anyhow::Result; - - /// Return protocol config by the given epoch id - async fn get_protocol_config_by_epoch_id( - &self, - epoch_id: near_primitives::hash::CryptoHash, - method_name: &str, - ) -> anyhow::Result; } diff --git a/database-new/src/base/state_indexer.rs b/database-new/src/base/state_indexer.rs index e46208b8..5dbfd901 100644 --- a/database-new/src/base/state_indexer.rs +++ b/database-new/src/base/state_indexer.rs @@ -32,19 +32,6 @@ pub trait StateIndexerDbManager { epoch_height: u64, epoch_start_height: u64, validators_info: &near_primitives::views::EpochValidatorInfo, - ) -> anyhow::Result<()>; - - async fn add_protocol_config( - &self, - epoch_id: near_indexer_primitives::CryptoHash, - epoch_height: u64, - epoch_start_height: u64, - protocol_config: &near_chain_configs::ProtocolConfigView, - ) -> anyhow::Result<()>; - - async fn update_epoch_end_height( - &self, - epoch_id: near_indexer_primitives::CryptoHash, epoch_end_block_hash: near_indexer_primitives::CryptoHash, ) -> anyhow::Result<()>; diff --git a/database-new/src/postgres/migrations/meta_db/20240619122029_init.down.sql b/database-new/src/postgres/migrations/meta_db/20240619122029_init.down.sql index 3c6a952f..5f630fb1 100644 --- a/database-new/src/postgres/migrations/meta_db/20240619122029_init.down.sql +++ b/database-new/src/postgres/migrations/meta_db/20240619122029_init.down.sql @@ -3,5 +3,4 @@ DROP TABLE IF EXISTS blocks; DROP TABLE IF EXISTS chunks; DROP TABLE IF EXISTS chunks_duplicate; DROP TABLE IF EXISTS validators; -DROP TABLE IF EXISTS protocol_configs; DROP TABLE IF EXISTS meta; diff --git a/database-new/src/postgres/migrations/meta_db/20240619122029_init.up.sql b/database-new/src/postgres/migrations/meta_db/20240619122029_init.up.sql index 79c05a82..f630b29f 100644 --- a/database-new/src/postgres/migrations/meta_db/20240619122029_init.up.sql +++ b/database-new/src/postgres/migrations/meta_db/20240619122029_init.up.sql @@ -66,17 +66,6 @@ CREATE TABLE IF NOT EXISTS validators ( validators_info jsonb NOT NULL ); - --- Create protocol_configs table to store protocol config for each epoch -CREATE TABLE IF NOT EXISTS protocol_configs ( - epoch_id text NOT NULL PRIMARY KEY, - epoch_height numeric(20,0) NOT NULL, - epoch_start_height numeric(20,0) NOT NULL, - epoch_end_height numeric(20,0) NULL, - protocol_config jsonb NOT NULL -); - - -- Create meta table to store last processed block height for each indexer CREATE TABLE IF NOT EXISTS meta ( indexer_id text NOT NULL PRIMARY KEY, diff --git a/database-new/src/postgres/rpc_server.rs b/database-new/src/postgres/rpc_server.rs index c4226b1c..e846b0e7 100644 --- a/database-new/src/postgres/rpc_server.rs +++ b/database-new/src/postgres/rpc_server.rs @@ -16,7 +16,7 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { " SELECT block_height FROM blocks - WHERE block_hash = ? + WHERE block_hash = $1 LIMIT 1 ", ) @@ -40,7 +40,7 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { " SELECT block_height, shard_id FROM chunks - WHERE chunk_hash = ? + WHERE chunk_hash = $1 LIMIT 1 ", ) @@ -98,7 +98,7 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { " SELECT data_value FROM state_changes_data - WHERE account_id = ? AND key_data = ? AND block_height <= ? + WHERE account_id = $1 AND key_data = $2 AND block_height <= $3 ORDER BY block_height DESC LIMIT 1 ", @@ -130,7 +130,7 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { " SELECT block_height, block_hash, data_value FROM state_changes_account - WHERE account_id = ? AND block_height <= ? + WHERE account_id = $1 AND block_height <= $2 ORDER BY block_height DESC LIMIT 1 ", @@ -166,7 +166,7 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { " SELECT block_height, block_hash, data_value FROM state_changes_contract - WHERE account_id = ? AND block_height <= ? + WHERE account_id = $1 AND block_height <= $2 ORDER BY block_height DESC LIMIT 1 ", @@ -204,7 +204,7 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { " SELECT block_height, block_hash, data_value FROM state_changes_access_key - WHERE account_id = ? AND data_key = ? AND block_height <= ? + WHERE account_id = $1 AND data_key = $2 AND block_height <= $3 ORDER BY block_height DESC LIMIT 1 ", @@ -239,7 +239,7 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { " SELECT receipt_id, parent_transaction_hash, receiver_id, block_height, block_hash, shard_id FROM receipts_map - WHERE receipt_id = ? + WHERE receipt_id = $1 LIMIT 1 ", ) @@ -292,7 +292,7 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { " SELECT included_in_block_height, shard_id FROM chunks_duplicate - WHERE block_height = ? AND shard_id = ? + WHERE block_height = $1 AND shard_id = $2 LIMIT 1 ", ) @@ -311,19 +311,20 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { crate::metrics::META_DATABASE_READ_QUERIES .with_label_values(&[method_name, "validators"]) .inc(); - let (epoch_height, validators_info): (bigdecimal::BigDecimal, String) = sqlx::query_as( - " + let (epoch_height, validators_info): (bigdecimal::BigDecimal, serde_json::Value) = + sqlx::query_as( + " SELECT epoch_height, validators_info FROM validators - WHERE epoch_id = ? + WHERE epoch_id = $1 LIMIT 1 ", - ) - .bind(epoch_id.to_string()) - .fetch_one(&self.meta_db_pool) - .await?; + ) + .bind(epoch_id.to_string()) + .fetch_one(&self.meta_db_pool) + .await?; let validators_info: near_primitives::views::EpochValidatorInfo = - serde_json::from_str(&validators_info)?; + serde_json::from_value(validators_info)?; Ok(readnode_primitives::EpochValidatorsInfo { epoch_id, epoch_height: epoch_height @@ -342,22 +343,25 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { crate::metrics::META_DATABASE_READ_QUERIES .with_label_values(&[method_name, "validators"]) .inc(); - let (epoch_id, epoch_height, validators_info): (String, bigdecimal::BigDecimal, String) = - sqlx::query_as( - " + let (epoch_id, epoch_height, validators_info): ( + String, + bigdecimal::BigDecimal, + serde_json::Value, + ) = sqlx::query_as( + " SELECT epoch_id, epoch_height, validators_info FROM validators - WHERE epoch_end_height = ? + WHERE epoch_end_height = $1 LIMIT 1 ", - ) - .bind(bigdecimal::BigDecimal::from(block_height)) - .fetch_one(&self.meta_db_pool) - .await?; + ) + .bind(bigdecimal::BigDecimal::from(block_height)) + .fetch_one(&self.meta_db_pool) + .await?; let epoch_id = near_indexer_primitives::CryptoHash::from_str(&epoch_id) .map_err(|err| anyhow::anyhow!("Failed to parse `epoch_id` to CryptoHash: {}", err))?; let validators_info: near_primitives::views::EpochValidatorInfo = - serde_json::from_str(&validators_info)?; + serde_json::from_value(validators_info)?; Ok(readnode_primitives::EpochValidatorsInfo { epoch_id, epoch_height: epoch_height @@ -367,29 +371,4 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { validators_info, }) } - - async fn get_protocol_config_by_epoch_id( - &self, - epoch_id: near_indexer_primitives::CryptoHash, - method_name: &str, - ) -> anyhow::Result { - crate::metrics::META_DATABASE_READ_QUERIES - .with_label_values(&[method_name, "protocol_configs"]) - .inc(); - let (protocol_config,): (String,) = sqlx::query_as( - " - SELECT protocol_config - FROM protocol_configs - WHERE epoch_id = ? - LIMIT 1 - ", - ) - .bind(epoch_id.to_string()) - .fetch_one(&self.meta_db_pool) - .await?; - let protocol_config: near_chain_configs::ProtocolConfigView = - serde_json::from_str(&protocol_config)?; - - Ok(protocol_config) - } } diff --git a/database-new/src/postgres/state_indexer.rs b/database-new/src/postgres/state_indexer.rs index 2be6dbf8..ac81399e 100644 --- a/database-new/src/postgres/state_indexer.rs +++ b/database-new/src/postgres/state_indexer.rs @@ -40,6 +40,7 @@ impl crate::PostgresDBManager { _ => {} } }); + query_builder.push(" ON CONFLICT DO NOTHING;"); query_builder .build() .execute( @@ -96,6 +97,7 @@ impl crate::PostgresDBManager { _ => {} } }); + query_builder.push(" ON CONFLICT DO NOTHING;"); query_builder .build() .execute( @@ -143,6 +145,7 @@ impl crate::PostgresDBManager { _ => {} } }); + query_builder.push(" ON CONFLICT DO NOTHING;"); query_builder .build() .execute( @@ -190,6 +193,7 @@ impl crate::PostgresDBManager { _ => {} } }); + query_builder.push(" ON CONFLICT DO NOTHING;"); query_builder .build() .execute( @@ -212,7 +216,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager { sqlx::query( " INSERT INTO blocks (block_height, block_hash) - VALUES ($1, $2) + VALUES ($1, $2) ON CONFLICT DO NOTHING; ", ) .bind(bigdecimal::BigDecimal::from(block_height)) @@ -231,17 +235,17 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager { crate::primitives::HeightIncluded, )>, ) -> anyhow::Result<()> { - let chunks_uniq = chunks + let unique_chunks = chunks .iter() .filter(|(_chunk_hash, _shard_id, height_included)| height_included == &block_height) .collect::>(); - if !chunks_uniq.is_empty() { + if !unique_chunks.is_empty() { let mut query_builder: sqlx::QueryBuilder = sqlx::QueryBuilder::new("INSERT INTO chunks (chunk_hash, block_height, shard_id) "); query_builder.push_values( - chunks_uniq.iter(), + unique_chunks.iter(), |mut values, (chunk_hash, shard_id, height_included)| { values .push_bind(chunk_hash.to_string()) @@ -249,7 +253,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager { .push_bind(bigdecimal::BigDecimal::from(shard_id.clone())); }, ); - + query_builder.push(" ON CONFLICT DO NOTHING;"); query_builder.build().execute(&self.meta_db_pool).await?; } @@ -271,7 +275,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager { .push_bind(bigdecimal::BigDecimal::from(height_included.clone())); }, ); - + query_builder.push(" ON CONFLICT DO NOTHING;"); query_builder.build().execute(&self.meta_db_pool).await?; } @@ -323,7 +327,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager { " SELECT last_processed_block_height FROM meta - WHERE indexer_id = ? + WHERE indexer_id = $1 LIMIT 1; ", ) @@ -341,79 +345,27 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager { epoch_height: u64, epoch_start_height: u64, validators_info: &near_primitives::views::EpochValidatorInfo, + epoch_end_block_hash: near_indexer_primitives::CryptoHash, ) -> anyhow::Result<()> { - sqlx::query( - " - INSERT INTO validators (epoch_id, epoch_height, epoch_start_height, epoch_end_height, protocol_config) - VALUES (?, ?, ?, NULL, ?); - " - ) - .bind(&epoch_id.to_string()) - .bind(bigdecimal::BigDecimal::from(epoch_height)) - .bind(bigdecimal::BigDecimal::from(epoch_start_height)) - .bind(&serde_json::to_string(validators_info)?) - .execute(&self.meta_db_pool) + let epoch_end_block_height = self + .get_block_by_hash(epoch_end_block_hash, "add_validators") .await?; - Ok(()) - } - - async fn add_protocol_config( - &self, - epoch_id: near_indexer_primitives::CryptoHash, - epoch_height: u64, - epoch_start_height: u64, - protocol_config: &near_chain_configs::ProtocolConfigView, - ) -> anyhow::Result<()> { sqlx::query( " - INSERT INTO protocol_configs (epoch_id, epoch_height, epoch_start_height, epoch_end_height, protocol_config) - VALUES (?, ?, ?, NULL, ?); + INSERT INTO validators (epoch_id, epoch_height, epoch_start_height, epoch_end_height, validators_info) + VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING; " ) .bind(&epoch_id.to_string()) .bind(bigdecimal::BigDecimal::from(epoch_height)) .bind(bigdecimal::BigDecimal::from(epoch_start_height)) - .bind(&serde_json::to_string(protocol_config)?) + .bind(bigdecimal::BigDecimal::from(epoch_end_block_height)) + .bind(&serde_json::to_value(validators_info)?) .execute(&self.meta_db_pool) .await?; Ok(()) } - async fn update_epoch_end_height( - &self, - epoch_id: near_indexer_primitives::CryptoHash, - epoch_end_block_hash: near_indexer_primitives::CryptoHash, - ) -> anyhow::Result<()> { - let block_height = self - .get_block_by_hash(epoch_end_block_hash, "update_epoch_end_height") - .await?; - let epoch_id_str = epoch_id.to_string(); - let updated_validators_future = sqlx::query( - " - UPDATE validators - SET epoch_end_height = ? - WHERE epoch_id = ?; - ", - ) - .bind(bigdecimal::BigDecimal::from(block_height)) - .bind(&epoch_id_str) - .execute(&self.meta_db_pool); - - let updated_protocol_config_future = sqlx::query( - " - UPDATE protocol_configs - SET epoch_end_height = ? - WHERE epoch_id = ?; - ", - ) - .bind(bigdecimal::BigDecimal::from(block_height)) - .bind(&epoch_id_str) - .execute(&self.meta_db_pool); - - futures::try_join!(updated_validators_future, updated_protocol_config_future)?; - Ok(()) - } - async fn save_state_changes_data( &self, state_changes: Vec, diff --git a/tx-indexer/src/main.rs b/tx-indexer/src/main.rs index 14fbf6fc..892096cb 100644 --- a/tx-indexer/src/main.rs +++ b/tx-indexer/src/main.rs @@ -24,7 +24,8 @@ async fn main() -> anyhow::Result<()> { // #[cfg(feature = "scylla_db")] let db_manager: std::sync::Arc> = std::sync::Arc::new(Box::new( - database::prepare_db_manager::(&indexer_config.database).await?, + database::prepare_db_manager::(&indexer_config.database) + .await?, )); // #[cfg(all(feature = "postgres_db", not(feature = "scylla_db")))] // let db_manager: std::sync::Arc<