Skip to content

Commit

Permalink
database-new crate
Browse files Browse the repository at this point in the history
  • Loading branch information
Yurii Koba authored and kobayurii committed Jun 24, 2024
1 parent 980c381 commit e2b47f0
Show file tree
Hide file tree
Showing 13 changed files with 479 additions and 876 deletions.
1,036 changes: 256 additions & 780 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ license = "MIT OR Apache-2.0"
resolver = "2"
members = [
"configuration",
"database",
# "database",
"database-new",
"epoch-indexer",
"near-state-indexer",
Expand All @@ -36,7 +36,7 @@ members = [
[workspace.dependencies]

configuration = { path = "configuration" }
database = { path = "database" }
#database = { path = "database" }
database-new = { path = "database-new" }
readnode-primitives = { path = "readnode-primitives" }
epoch-indexer = { path = "epoch-indexer" }
Expand Down
1 change: 1 addition & 0 deletions database-new/src/postgres/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod rpc_server;
mod state_indexer;
mod tx_indexer;

static META_DB_MIGRATOR: sqlx::migrate::Migrator =
sqlx::migrate!("src/postgres/migrations/meta_db");
Expand Down
81 changes: 55 additions & 26 deletions database-new/src/postgres/state_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
sqlx::query(
"
INSERT INTO blocks (block_height, block_hash)
VALUES (?, ?)
VALUES ($1, $2)
",
)
.bind(bigdecimal::BigDecimal::from(block_height))
Expand All @@ -231,22 +231,50 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
crate::primitives::HeightIncluded,
)>,
) -> anyhow::Result<()> {
let mut query_builder: sqlx::QueryBuilder<sqlx::Postgres> = sqlx::QueryBuilder::new(
"INSERT INTO chunks (block_height, chunk_hash, shard_id, height_included) ",
);
let chunks_uniq = chunks
.iter()
.filter(|(_chunk_hash, _shard_id, height_included)| height_included == &block_height)
.collect::<Vec<_>>();

query_builder.push_values(
chunks.into_iter(),
|mut values, (chunk_hash, shard_id, height_included)| {
values
.push_bind(bigdecimal::BigDecimal::from(block_height))
.push_bind(chunk_hash.to_string())
.push_bind(bigdecimal::BigDecimal::from(shard_id))
.push_bind(bigdecimal::BigDecimal::from(height_included));
},
);
if !chunks_uniq.is_empty() {
let mut query_builder: sqlx::QueryBuilder<sqlx::Postgres> =
sqlx::QueryBuilder::new("INSERT INTO chunks (chunk_hash, block_height, shard_id) ");

query_builder.push_values(
chunks_uniq.iter(),
|mut values, (chunk_hash, shard_id, height_included)| {
values
.push_bind(chunk_hash.to_string())
.push_bind(bigdecimal::BigDecimal::from(height_included.clone()))
.push_bind(bigdecimal::BigDecimal::from(shard_id.clone()));
},
);

query_builder.build().execute(&self.meta_db_pool).await?;
}

let chunks_duplicate = chunks
.iter()
.filter(|(_chunk_hash, _shard_id, height_included)| height_included != &block_height)
.collect::<Vec<_>>();
if !chunks_duplicate.is_empty() {
let mut query_builder: sqlx::QueryBuilder<sqlx::Postgres> =
sqlx::QueryBuilder::new("INSERT INTO chunks_duplicate (chunk_hash, block_height, shard_id, included_in_block_height) ");

query_builder.push_values(
chunks.iter(),
|mut values, (chunk_hash, shard_id, height_included)| {
values
.push_bind(chunk_hash.to_string())
.push_bind(bigdecimal::BigDecimal::from(block_height))
.push_bind(bigdecimal::BigDecimal::from(shard_id.clone()))
.push_bind(bigdecimal::BigDecimal::from(height_included.clone()));
},
);

query_builder.build().execute(&self.meta_db_pool).await?;
}

query_builder.build().execute(&self.meta_db_pool).await?;
Ok(())
}

Expand All @@ -262,8 +290,8 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
"
SELECT block_height
FROM blocks
WHERE block_hash = ?
LIMIT 1
WHERE block_hash = $1
LIMIT 1;
",
)
.bind(block_hash.to_string())
Expand All @@ -277,13 +305,14 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
async fn update_meta(&self, indexer_id: &str, block_height: u64) -> anyhow::Result<()> {
sqlx::query(
"
UPDATE meta
SET last_processed_block_height = ?
WHERE indexer_id = ?
INSERT INTO meta (indexer_id, last_processed_block_height)
VALUES ($1, $2)
ON CONFLICT (indexer_id)
DO UPDATE SET last_processed_block_height = $2;
",
)
.bind(bigdecimal::BigDecimal::from(block_height))
.bind(indexer_id)
.bind(bigdecimal::BigDecimal::from(block_height))
.execute(&self.meta_db_pool)
.await?;
Ok(())
Expand All @@ -295,7 +324,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
SELECT last_processed_block_height
FROM meta
WHERE indexer_id = ?
LIMIT 1
LIMIT 1;
",
)
.bind(indexer_id)
Expand All @@ -316,7 +345,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
sqlx::query(
"
INSERT INTO validators (epoch_id, epoch_height, epoch_start_height, epoch_end_height, protocol_config)
VALUES (?, ?, ?, NULL, ?)
VALUES (?, ?, ?, NULL, ?);
"
)
.bind(&epoch_id.to_string())
Expand All @@ -338,7 +367,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
sqlx::query(
"
INSERT INTO protocol_configs (epoch_id, epoch_height, epoch_start_height, epoch_end_height, protocol_config)
VALUES (?, ?, ?, NULL, ?)
VALUES (?, ?, ?, NULL, ?);
"
)
.bind(&epoch_id.to_string())
Expand All @@ -363,7 +392,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
"
UPDATE validators
SET epoch_end_height = ?
WHERE epoch_id = ?
WHERE epoch_id = ?;
",
)
.bind(bigdecimal::BigDecimal::from(block_height))
Expand All @@ -374,7 +403,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
"
UPDATE protocol_configs
SET epoch_end_height = ?
WHERE epoch_id = ?
WHERE epoch_id = ?;
",
)
.bind(bigdecimal::BigDecimal::from(block_height))
Expand Down
88 changes: 88 additions & 0 deletions database-new/src/postgres/tx_indexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use near_indexer_primitives::IndexerExecutionOutcomeWithReceipt;
use readnode_primitives::{CollectingTransactionDetails, TransactionKey};
use std::collections::HashMap;

#[async_trait::async_trait]
impl crate::TxIndexerDbManager for crate::PostgresDBManager {
async fn add_transaction(
&self,
transaction_hash: &str,
tx_bytes: Vec<u8>,
block_height: u64,
signer_id: &str,
) -> anyhow::Result<()> {
todo!()
}

async fn validate_saved_transaction_deserializable(
&self,
transaction_hash: &str,
tx_bytes: &[u8],
) -> anyhow::Result<bool> {
todo!()
}

async fn add_receipt(
&self,
receipt_id: &str,
parent_tx_hash: &str,
block_height: u64,
shard_id: u64,
) -> anyhow::Result<()> {
todo!()
}

async fn update_meta(&self, indexer_id: &str, block_height: u64) -> anyhow::Result<()> {
todo!()
}

async fn cache_add_transaction(
&self,
transaction_details: CollectingTransactionDetails,
) -> anyhow::Result<()> {
todo!()
}

async fn cache_add_receipt(
&self,
transaction_key: TransactionKey,
indexer_execution_outcome_with_receipt: IndexerExecutionOutcomeWithReceipt,
) -> anyhow::Result<()> {
todo!()
}

async fn get_transactions_to_cache(
&self,
start_block_height: u64,
cache_restore_blocks_range: u64,
max_db_parallel_queries: i64,
) -> anyhow::Result<HashMap<TransactionKey, CollectingTransactionDetails>> {
todo!()
}

async fn get_transaction_by_receipt_id(
&self,
receipt_id: &str,
) -> anyhow::Result<CollectingTransactionDetails> {
todo!()
}

async fn get_receipts_in_cache(
&self,
transaction_key: &TransactionKey,
) -> anyhow::Result<Vec<IndexerExecutionOutcomeWithReceipt>> {
todo!()
}

async fn cache_delete_transaction(
&self,
transaction_hash: &str,
block_height: u64,
) -> anyhow::Result<()> {
todo!()
}

async fn get_last_processed_block_height(&self, indexer_id: &str) -> anyhow::Result<u64> {
todo!()
}
}
12 changes: 6 additions & 6 deletions epoch-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ tokio = { version = "1.36.0", features = [
tracing = "0.1.34"

configuration.workspace = true
database.workspace = true
database-new.workspace = true
readnode-primitives.workspace = true

near-jsonrpc-client.workspace = true
near-chain-configs.workspace = true
near-indexer-primitives.workspace = true
near-lake-framework.workspace = true

[features]
default = ["scylla_db"]
postgres_db = ["database/postgres_db"]
scylla_db = ["database/scylla_db"]
scylla_db_tracing = ["database/scylla_db_tracing", "scylla_db"]
#[features]
#default = ["scylla_db"]
#postgres_db = ["database/postgres_db"]
#scylla_db = ["database/scylla_db"]
#scylla_db_tracing = ["database/scylla_db_tracing", "scylla_db"]
2 changes: 2 additions & 0 deletions epoch-indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
extern crate database_new as database;

use near_indexer_primitives::{near_primitives, CryptoHash};

pub async fn get_epoch_validators(
Expand Down
21 changes: 11 additions & 10 deletions epoch-indexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
extern crate database_new as database;

use crate::config::{Opts, StartOptions};
use clap::Parser;
use database::StateIndexerDbManager;
Expand Down Expand Up @@ -52,17 +54,16 @@ async fn main() -> anyhow::Result<()> {

let opts: Opts = Opts::parse();

#[cfg(feature = "scylla_db")]
let db_manager = database::prepare_db_manager::<
database::scylladb::state_indexer::ScyllaDBManager,
>(&indexer_config.database)
.await?;
// #[cfg(feature = "scylla_db")]
let db_manager =
database::prepare_db_manager::<database::PostgresDBManager>(&indexer_config.database)
.await?;

#[cfg(all(feature = "postgres_db", not(feature = "scylla_db")))]
let db_manager = database::prepare_db_manager::<
database::postgres::state_indexer::PostgresDBManager,
>(&indexer_config.database)
.await?;
// #[cfg(all(feature = "postgres_db", not(feature = "scylla_db")))]
// let db_manager = database::prepare_db_manager::<
// database::postgres::state_indexer::PostgresDBManager,
// >(&indexer_config.database)
// .await?;

let indexer_id = &indexer_config.general.indexer_id;
let s3_client = indexer_config.lake_config.lake_s3_client().await;
Expand Down
12 changes: 6 additions & 6 deletions near-state-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ tokio-stream = "0.1"
tracing = "0.1.34"

configuration.workspace = true
database.workspace = true
database-new.workspace = true

near-o11y.workspace = true
near-chain-configs.workspace = true
near-client.workspace = true
near-indexer.workspace = true

[features]
default = ["scylla_db"]
#default = ["scylla_db"]
tracing-instrumentation = ["configuration/tracing-instrumentation"]
postgres_db = ["database/postgres_db"]
scylla_db = ["database/scylla_db"]
scylla_db_tracing = ["database/scylla_db_tracing", "scylla_db"]
account_access_keys = ["database/account_access_keys"]
#postgres_db = ["database/postgres_db"]
#scylla_db = ["database/scylla_db"]
#scylla_db_tracing = ["database/scylla_db_tracing", "scylla_db"]
#account_access_keys = ["database/account_access_keys"]
21 changes: 11 additions & 10 deletions near-state-indexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
extern crate database_new as database;

use clap::Parser;
use futures::StreamExt;
use near_indexer::near_primitives;
Expand Down Expand Up @@ -274,16 +276,15 @@ async fn run(home_dir: std::path::PathBuf) -> anyhow::Result<()> {
configuration::read_configuration::<configuration::NearStateIndexerConfig>().await?;

tracing::info!(target: INDEXER, "Connecting to db...");
#[cfg(feature = "scylla_db")]
let db_manager = database::prepare_db_manager::<
database::scylladb::state_indexer::ScyllaDBManager,
>(&state_indexer_config.database)
.await?;
#[cfg(all(feature = "postgres_db", not(feature = "scylla_db")))]
let db_manager = database::prepare_db_manager::<
database::postgres::state_indexer::PostgresDBManager,
>(&state_indexer_config.database)
.await?;
// #[cfg(feature = "scylla_db")]
let db_manager =
database::prepare_db_manager::<database::PostgresDBManager>(&state_indexer_config.database)
.await?;
// #[cfg(all(feature = "postgres_db", not(feature = "scylla_db")))]
// let db_manager = database::prepare_db_manager::<
// database::postgres::state_indexer::PostgresDBManager,
// >(&state_indexer_config.database)
// .await?;

tracing::info!(target: INDEXER, "Connecting to redis...");
let redis_client = redis::Client::open(state_indexer_config.general.redis_url.clone())?
Expand Down
Loading

0 comments on commit e2b47f0

Please sign in to comment.