diff --git a/README.md b/README.md index c3c2c60..a0325dd 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ This application extracting EVM logs from Erigon database(libmdbx) and store it Parquet binary format data file. Example: ```asset/sepolia_10000_500000_all_logs.parquet``` -Parquet table schema: +#### Parquet table schema: ``` message schema { REQUIRED INT64 block_n; @@ -18,9 +18,17 @@ message schema { ``` ![dbeaver.png](asset%2Fdbeaver.png) -Feature: +#### Features: * Extract logs by block number range: from - to * Filter logs by contract address: optional + * Execute job by rest api: `api/v1/exec-job?job_id=test_1&block_number_start=10000&block_number_end=3000000` +#### start.sh Exmaple: +```shell +export DB_PATH=/home/art/dev/sepolia-chaindata/ +export HTTP_ADDRESS=0.0.0.0 +export HTTP_PORT=9090 +export RESULT_PATH=/tmp -TODO: Use http rpc for creating new job +./erigon_db_reader +``` \ No newline at end of file diff --git a/src/engine/mod.rs b/src/engine/mod.rs index 7b35f20..029f024 100644 --- a/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -10,18 +10,18 @@ use std::path::PathBuf; use std::sync::Arc; use tokio::time::Instant; -pub struct Engine { +pub struct SearchEngine { db_path: PathBuf, result_storage: Arc, } pub const LOG_CHUNK_SIZE: usize = 10_000; -impl Engine { +impl SearchEngine { pub fn new(db_path: PathBuf, result_storage: Arc) -> Self { info!("Db path:{}", db_path.to_str().unwrap()); - Engine { + SearchEngine { db_path, result_storage, } @@ -130,7 +130,7 @@ impl Engine { #[cfg(test)] mod test { - use crate::engine::Engine; + use crate::engine::SearchEngine; use crate::storage::ResultStorage; use crate::util::setup_log; use std::path::PathBuf; @@ -142,7 +142,7 @@ mod test { let result_path = "/tmp"; let db_path = "/home/art/dev/sepolia-chaindata/"; let result_storage = Arc::new(ResultStorage::new(PathBuf::from(result_path))); - let engine = Arc::new(Engine::new(PathBuf::from(db_path), result_storage.clone())); + let engine = Arc::new(SearchEngine::new(PathBuf::from(db_path), result_storage.clone())); engine .execute_job("test-1".to_string(), 10_000, 3_000_000, None) diff --git a/src/http/mod.rs b/src/http/mod.rs new file mode 100644 index 0000000..d9c1ce4 --- /dev/null +++ b/src/http/mod.rs @@ -0,0 +1,47 @@ +mod types; + +use crate::engine::{SearchEngine}; +use ethers::types::{Address}; +use rocket::{get, routes, State}; +use std::str::FromStr; +use std::sync::Arc; +use crate::http::types::ExecJobRequest; + +pub struct HttpApi {} + +impl HttpApi { + pub async fn new(port: u16, address: String, search_engine: Arc) { + rocket::build() + .configure(rocket::Config { + address: address.parse().unwrap(), + port, + ..rocket::Config::default() + }) + .manage(search_engine) + .mount("/", routes![exec_job]) + .launch() + .await + .expect("Err setup"); + } +} + +// Example: api/v1/exec-job?job_id=test_1&block_number_start=10000&block_number_end=3000000 +#[get("/api/v1/exec-job?")] +async fn exec_job(search_engine: &State>, query: ExecJobRequest) -> String { + let _search_engine = search_engine.inner().clone(); + let _job_id = query.job_id.clone(); + + tokio::spawn(async move { + _search_engine.execute_job( + query.job_id, + query.block_number_start, + query.block_number_end, + query + .contract + .map(|c| Address::from_str(c.as_str()).unwrap()) + ).await.unwrap(); + }); + + + return _job_id; +} diff --git a/src/http/types.rs b/src/http/types.rs new file mode 100644 index 0000000..0666a5e --- /dev/null +++ b/src/http/types.rs @@ -0,0 +1,10 @@ + +use rocket::{FromForm}; + +#[derive(FromForm, Debug)] +pub struct ExecJobRequest { + pub job_id: String, + pub block_number_start: u64, + pub block_number_end: u64, + pub contract: Option, +} diff --git a/src/main.rs b/src/main.rs index a749806..f0e2db0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,16 +2,18 @@ mod engine; mod storage; mod types; mod util; +mod http; use std::path::PathBuf; -use crate::engine::Engine; +use crate::engine::SearchEngine; use crate::storage::ResultStorage; use crate::types::AppCfg; use crate::util::setup_log; use clap::Parser; use log::info; use std::sync::Arc; +use crate::http::HttpApi; #[tokio::main] async fn main() { @@ -19,8 +21,15 @@ async fn main() { info!("Start app"); let app_cfg = AppCfg::parse(); let result_storage = Arc::new(ResultStorage::new(PathBuf::from(app_cfg.result_path))); - let engine = Arc::new(Engine::new( + let search_engine = Arc::new(SearchEngine::new( PathBuf::from(app_cfg.db_path), result_storage.clone(), )); + + HttpApi::new( + app_cfg.http_port, + app_cfg.http_address, + search_engine.clone(), + ) + .await; } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index bb73135..889cde8 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -69,6 +69,21 @@ impl ResultStorage { .set_compression(Compression::SNAPPY) .build(); let mut writer = SerializedFileWriter::new(file, schema, props.into()).unwrap(); + // reuse + let mut block_n_col = Vec::with_capacity(LOG_CHUNK_SIZE); + let mut tx_n_col = Vec::with_capacity(LOG_CHUNK_SIZE); + let mut ctr_col = Vec::with_capacity(LOG_CHUNK_SIZE); + let mut op_code_col = Vec::with_capacity(LOG_CHUNK_SIZE); + let mut topic_0_col = Vec::with_capacity(LOG_CHUNK_SIZE); + let mut topic_0_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE); + let mut topic_1_col = Vec::with_capacity(LOG_CHUNK_SIZE); + let mut topic_1_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE); + let mut topic_2_col = Vec::with_capacity(LOG_CHUNK_SIZE); + let mut topic_2_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE); + let mut topic_3_col = Vec::with_capacity(LOG_CHUNK_SIZE); + let mut topic_3_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE); + let mut data_col = Vec::with_capacity(LOG_CHUNK_SIZE); + let mut data_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE); while let Some(chunk) = rcv.recv().await { if chunk.is_none() { @@ -76,20 +91,6 @@ impl ResultStorage { } let chunk = chunk.unwrap(); info!("[{}] Chunk size: {:?}", _job_id, chunk.txs.len()); - let mut block_n_col = Vec::with_capacity(LOG_CHUNK_SIZE); - let mut tx_n_col = Vec::with_capacity(LOG_CHUNK_SIZE); - let mut ctr_col = Vec::with_capacity(LOG_CHUNK_SIZE); - let mut op_code_col = Vec::with_capacity(LOG_CHUNK_SIZE); - let mut topic_0_col = Vec::with_capacity(LOG_CHUNK_SIZE); - let mut topic_0_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE); - let mut topic_1_col = Vec::with_capacity(LOG_CHUNK_SIZE); - let mut topic_1_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE); - let mut topic_2_col = Vec::with_capacity(LOG_CHUNK_SIZE); - let mut topic_2_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE); - let mut topic_3_col = Vec::with_capacity(LOG_CHUNK_SIZE); - let mut topic_3_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE); - let mut data_col = Vec::with_capacity(LOG_CHUNK_SIZE); - let mut data_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE); for _log_model in chunk.txs { let (block_n, tx_n, logs) = _log_model; @@ -98,28 +99,28 @@ impl ResultStorage { ctr_col.push(ByteArray::from(logs.address.as_bytes())); op_code_col.push(logs.topics.len() as i32); - Self::save_topic( + Self::populate_topic( &mut topic_0_col, &mut topic_0_def_level_col, logs.topics.get(0), ); - Self::save_topic( + Self::populate_topic( &mut topic_1_col, &mut topic_1_def_level_col, logs.topics.get(1), ); - Self::save_topic( + Self::populate_topic( &mut topic_2_col, &mut topic_2_def_level_col, logs.topics.get(2), ); - Self::save_topic( + Self::populate_topic( &mut topic_3_col, &mut topic_3_def_level_col, logs.topics.get(3), ); - Self::save_binary(&mut data_col, &mut data_def_level_col, logs.data); + Self::populate_binary(&mut data_col, &mut data_def_level_col, logs.data); } let mut row_group_writer = writer.next_row_group().unwrap(); @@ -186,6 +187,22 @@ impl ResultStorage { .unwrap(); col_writer.close().unwrap(); row_group_writer.close().unwrap(); + + // free mem + block_n_col.clear(); + tx_n_col.clear(); + ctr_col.clear(); + op_code_col.clear(); + topic_0_col.clear(); + topic_0_def_level_col.clear(); + topic_1_col.clear(); + topic_1_def_level_col.clear(); + topic_2_col.clear(); + topic_2_def_level_col.clear(); + topic_3_col.clear(); + topic_3_def_level_col.clear(); + data_col.clear(); + data_def_level_col.clear(); } info!("[{}] Stop result writer.", job_id); @@ -195,7 +212,7 @@ impl ResultStorage { (sender, worker) } - fn save_topic(col: &mut Vec, def_level_col: &mut Vec, topic: Option<&H256>) { + fn populate_topic(col: &mut Vec, def_level_col: &mut Vec, topic: Option<&H256>) { if let Some(data) = topic { col.push(ByteArray::from(data.as_bytes())); def_level_col.push(1_i16); @@ -205,7 +222,7 @@ impl ResultStorage { } } - fn save_binary(col: &mut Vec, def_level_col: &mut Vec, binary: Option) { + fn populate_binary(col: &mut Vec, def_level_col: &mut Vec, binary: Option) { if let Some(data) = binary { if data.is_empty() { col.push(ByteArray::from(vec![]));