Skip to content

Commit

Permalink
improvement and optimisation
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Jun 25, 2024
1 parent e2b47f0 commit 7fce8ef
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 149 deletions.
7 changes: 0 additions & 7 deletions database-new/src/base/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,4 @@ pub trait ReaderDbManager {
block_height: near_primitives::types::BlockHeight,
method_name: &str,
) -> anyhow::Result<readnode_primitives::EpochValidatorsInfo>;

/// Return protocol config by the given epoch id
async fn get_protocol_config_by_epoch_id(
&self,
epoch_id: near_primitives::hash::CryptoHash,
method_name: &str,
) -> anyhow::Result<near_chain_configs::ProtocolConfigView>;
}
13 changes: 0 additions & 13 deletions database-new/src/base/state_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,6 @@ pub trait StateIndexerDbManager {
epoch_height: u64,
epoch_start_height: u64,
validators_info: &near_primitives::views::EpochValidatorInfo,
) -> anyhow::Result<()>;

async fn add_protocol_config(
&self,
epoch_id: near_indexer_primitives::CryptoHash,
epoch_height: u64,
epoch_start_height: u64,
protocol_config: &near_chain_configs::ProtocolConfigView,
) -> anyhow::Result<()>;

async fn update_epoch_end_height(
&self,
epoch_id: near_indexer_primitives::CryptoHash,
epoch_end_block_hash: near_indexer_primitives::CryptoHash,
) -> anyhow::Result<()>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@ DROP TABLE IF EXISTS blocks;
DROP TABLE IF EXISTS chunks;
DROP TABLE IF EXISTS chunks_duplicate;
DROP TABLE IF EXISTS validators;
DROP TABLE IF EXISTS protocol_configs;
DROP TABLE IF EXISTS meta;
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,6 @@ CREATE TABLE IF NOT EXISTS validators (
validators_info jsonb NOT NULL
);


-- Create protocol_configs table to store protocol config for each epoch
CREATE TABLE IF NOT EXISTS protocol_configs (
epoch_id text NOT NULL PRIMARY KEY,
epoch_height numeric(20,0) NOT NULL,
epoch_start_height numeric(20,0) NOT NULL,
epoch_end_height numeric(20,0) NULL,
protocol_config jsonb NOT NULL
);


-- Create meta table to store last processed block height for each indexer
CREATE TABLE IF NOT EXISTS meta (
indexer_id text NOT NULL PRIMARY KEY,
Expand Down
79 changes: 29 additions & 50 deletions database-new/src/postgres/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"
SELECT block_height
FROM blocks
WHERE block_hash = ?
WHERE block_hash = $1
LIMIT 1
",
)
Expand All @@ -40,7 +40,7 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"
SELECT block_height, shard_id
FROM chunks
WHERE chunk_hash = ?
WHERE chunk_hash = $1
LIMIT 1
",
)
Expand Down Expand Up @@ -98,7 +98,7 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"
SELECT data_value
FROM state_changes_data
WHERE account_id = ? AND key_data = ? AND block_height <= ?
WHERE account_id = $1 AND key_data = $2 AND block_height <= $3
ORDER BY block_height DESC
LIMIT 1
",
Expand Down Expand Up @@ -130,7 +130,7 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"
SELECT block_height, block_hash, data_value
FROM state_changes_account
WHERE account_id = ? AND block_height <= ?
WHERE account_id = $1 AND block_height <= $2
ORDER BY block_height DESC
LIMIT 1
",
Expand Down Expand Up @@ -166,7 +166,7 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"
SELECT block_height, block_hash, data_value
FROM state_changes_contract
WHERE account_id = ? AND block_height <= ?
WHERE account_id = $1 AND block_height <= $2
ORDER BY block_height DESC
LIMIT 1
",
Expand Down Expand Up @@ -204,7 +204,7 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"
SELECT block_height, block_hash, data_value
FROM state_changes_access_key
WHERE account_id = ? AND data_key = ? AND block_height <= ?
WHERE account_id = $1 AND data_key = $2 AND block_height <= $3
ORDER BY block_height DESC
LIMIT 1
",
Expand Down Expand Up @@ -239,7 +239,7 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"
SELECT receipt_id, parent_transaction_hash, receiver_id, block_height, block_hash, shard_id
FROM receipts_map
WHERE receipt_id = ?
WHERE receipt_id = $1
LIMIT 1
",
)
Expand Down Expand Up @@ -292,7 +292,7 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"
SELECT included_in_block_height, shard_id
FROM chunks_duplicate
WHERE block_height = ? AND shard_id = ?
WHERE block_height = $1 AND shard_id = $2
LIMIT 1
",
)
Expand All @@ -311,19 +311,20 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
crate::metrics::META_DATABASE_READ_QUERIES
.with_label_values(&[method_name, "validators"])
.inc();
let (epoch_height, validators_info): (bigdecimal::BigDecimal, String) = sqlx::query_as(
"
let (epoch_height, validators_info): (bigdecimal::BigDecimal, serde_json::Value) =
sqlx::query_as(
"
SELECT epoch_height, validators_info
FROM validators
WHERE epoch_id = ?
WHERE epoch_id = $1
LIMIT 1
",
)
.bind(epoch_id.to_string())
.fetch_one(&self.meta_db_pool)
.await?;
)
.bind(epoch_id.to_string())
.fetch_one(&self.meta_db_pool)
.await?;
let validators_info: near_primitives::views::EpochValidatorInfo =
serde_json::from_str(&validators_info)?;
serde_json::from_value(validators_info)?;
Ok(readnode_primitives::EpochValidatorsInfo {
epoch_id,
epoch_height: epoch_height
Expand All @@ -342,22 +343,25 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
crate::metrics::META_DATABASE_READ_QUERIES
.with_label_values(&[method_name, "validators"])
.inc();
let (epoch_id, epoch_height, validators_info): (String, bigdecimal::BigDecimal, String) =
sqlx::query_as(
"
let (epoch_id, epoch_height, validators_info): (
String,
bigdecimal::BigDecimal,
serde_json::Value,
) = sqlx::query_as(
"
SELECT epoch_id, epoch_height, validators_info
FROM validators
WHERE epoch_end_height = ?
WHERE epoch_end_height = $1
LIMIT 1
",
)
.bind(bigdecimal::BigDecimal::from(block_height))
.fetch_one(&self.meta_db_pool)
.await?;
)
.bind(bigdecimal::BigDecimal::from(block_height))
.fetch_one(&self.meta_db_pool)
.await?;
let epoch_id = near_indexer_primitives::CryptoHash::from_str(&epoch_id)
.map_err(|err| anyhow::anyhow!("Failed to parse `epoch_id` to CryptoHash: {}", err))?;
let validators_info: near_primitives::views::EpochValidatorInfo =
serde_json::from_str(&validators_info)?;
serde_json::from_value(validators_info)?;
Ok(readnode_primitives::EpochValidatorsInfo {
epoch_id,
epoch_height: epoch_height
Expand All @@ -367,29 +371,4 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
validators_info,
})
}

async fn get_protocol_config_by_epoch_id(
&self,
epoch_id: near_indexer_primitives::CryptoHash,
method_name: &str,
) -> anyhow::Result<near_chain_configs::ProtocolConfigView> {
crate::metrics::META_DATABASE_READ_QUERIES
.with_label_values(&[method_name, "protocol_configs"])
.inc();
let (protocol_config,): (String,) = sqlx::query_as(
"
SELECT protocol_config
FROM protocol_configs
WHERE epoch_id = ?
LIMIT 1
",
)
.bind(epoch_id.to_string())
.fetch_one(&self.meta_db_pool)
.await?;
let protocol_config: near_chain_configs::ProtocolConfigView =
serde_json::from_str(&protocol_config)?;

Ok(protocol_config)
}
}
84 changes: 18 additions & 66 deletions database-new/src/postgres/state_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl crate::PostgresDBManager {
_ => {}
}
});
query_builder.push(" ON CONFLICT DO NOTHING;");
query_builder
.build()
.execute(
Expand Down Expand Up @@ -96,6 +97,7 @@ impl crate::PostgresDBManager {
_ => {}
}
});
query_builder.push(" ON CONFLICT DO NOTHING;");
query_builder
.build()
.execute(
Expand Down Expand Up @@ -143,6 +145,7 @@ impl crate::PostgresDBManager {
_ => {}
}
});
query_builder.push(" ON CONFLICT DO NOTHING;");
query_builder
.build()
.execute(
Expand Down Expand Up @@ -190,6 +193,7 @@ impl crate::PostgresDBManager {
_ => {}
}
});
query_builder.push(" ON CONFLICT DO NOTHING;");
query_builder
.build()
.execute(
Expand All @@ -212,7 +216,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
sqlx::query(
"
INSERT INTO blocks (block_height, block_hash)
VALUES ($1, $2)
VALUES ($1, $2) ON CONFLICT DO NOTHING;
",
)
.bind(bigdecimal::BigDecimal::from(block_height))
Expand All @@ -231,25 +235,25 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
crate::primitives::HeightIncluded,
)>,
) -> anyhow::Result<()> {
let chunks_uniq = chunks
let unique_chunks = chunks
.iter()
.filter(|(_chunk_hash, _shard_id, height_included)| height_included == &block_height)
.collect::<Vec<_>>();

if !chunks_uniq.is_empty() {
if !unique_chunks.is_empty() {
let mut query_builder: sqlx::QueryBuilder<sqlx::Postgres> =
sqlx::QueryBuilder::new("INSERT INTO chunks (chunk_hash, block_height, shard_id) ");

query_builder.push_values(
chunks_uniq.iter(),
unique_chunks.iter(),
|mut values, (chunk_hash, shard_id, height_included)| {
values
.push_bind(chunk_hash.to_string())
.push_bind(bigdecimal::BigDecimal::from(height_included.clone()))
.push_bind(bigdecimal::BigDecimal::from(shard_id.clone()));
},
);

query_builder.push(" ON CONFLICT DO NOTHING;");
query_builder.build().execute(&self.meta_db_pool).await?;
}

Expand All @@ -271,7 +275,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
.push_bind(bigdecimal::BigDecimal::from(height_included.clone()));
},
);

query_builder.push(" ON CONFLICT DO NOTHING;");
query_builder.build().execute(&self.meta_db_pool).await?;
}

Expand Down Expand Up @@ -323,7 +327,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
"
SELECT last_processed_block_height
FROM meta
WHERE indexer_id = ?
WHERE indexer_id = $1
LIMIT 1;
",
)
Expand All @@ -341,79 +345,27 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
epoch_height: u64,
epoch_start_height: u64,
validators_info: &near_primitives::views::EpochValidatorInfo,
epoch_end_block_hash: near_indexer_primitives::CryptoHash,
) -> anyhow::Result<()> {
sqlx::query(
"
INSERT INTO validators (epoch_id, epoch_height, epoch_start_height, epoch_end_height, protocol_config)
VALUES (?, ?, ?, NULL, ?);
"
)
.bind(&epoch_id.to_string())
.bind(bigdecimal::BigDecimal::from(epoch_height))
.bind(bigdecimal::BigDecimal::from(epoch_start_height))
.bind(&serde_json::to_string(validators_info)?)
.execute(&self.meta_db_pool)
let epoch_end_block_height = self
.get_block_by_hash(epoch_end_block_hash, "add_validators")
.await?;
Ok(())
}

async fn add_protocol_config(
&self,
epoch_id: near_indexer_primitives::CryptoHash,
epoch_height: u64,
epoch_start_height: u64,
protocol_config: &near_chain_configs::ProtocolConfigView,
) -> anyhow::Result<()> {
sqlx::query(
"
INSERT INTO protocol_configs (epoch_id, epoch_height, epoch_start_height, epoch_end_height, protocol_config)
VALUES (?, ?, ?, NULL, ?);
INSERT INTO validators (epoch_id, epoch_height, epoch_start_height, epoch_end_height, validators_info)
VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING;
"
)
.bind(&epoch_id.to_string())
.bind(bigdecimal::BigDecimal::from(epoch_height))
.bind(bigdecimal::BigDecimal::from(epoch_start_height))
.bind(&serde_json::to_string(protocol_config)?)
.bind(bigdecimal::BigDecimal::from(epoch_end_block_height))
.bind(&serde_json::to_value(validators_info)?)
.execute(&self.meta_db_pool)
.await?;
Ok(())
}

async fn update_epoch_end_height(
&self,
epoch_id: near_indexer_primitives::CryptoHash,
epoch_end_block_hash: near_indexer_primitives::CryptoHash,
) -> anyhow::Result<()> {
let block_height = self
.get_block_by_hash(epoch_end_block_hash, "update_epoch_end_height")
.await?;
let epoch_id_str = epoch_id.to_string();
let updated_validators_future = sqlx::query(
"
UPDATE validators
SET epoch_end_height = ?
WHERE epoch_id = ?;
",
)
.bind(bigdecimal::BigDecimal::from(block_height))
.bind(&epoch_id_str)
.execute(&self.meta_db_pool);

let updated_protocol_config_future = sqlx::query(
"
UPDATE protocol_configs
SET epoch_end_height = ?
WHERE epoch_id = ?;
",
)
.bind(bigdecimal::BigDecimal::from(block_height))
.bind(&epoch_id_str)
.execute(&self.meta_db_pool);

futures::try_join!(updated_validators_future, updated_protocol_config_future)?;
Ok(())
}

async fn save_state_changes_data(
&self,
state_changes: Vec<near_primitives::views::StateChangeWithCauseView>,
Expand Down
Loading

0 comments on commit 7fce8ef

Please sign in to comment.