Skip to content

Commit

Permalink
Http API
Browse files Browse the repository at this point in the history
  • Loading branch information
artjoma committed Oct 16, 2023
1 parent 00f3c9c commit 4bd14ad
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 31 deletions.
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This application extracting EVM logs from Erigon database(libmdbx) and store it Parquet binary format data file.
Example: ```asset/sepolia_10000_500000_all_logs.parquet```
Parquet table schema:
#### Parquet table schema:
```
message schema {
REQUIRED INT64 block_n;
Expand All @@ -18,9 +18,17 @@ message schema {
```
![dbeaver.png](asset%2Fdbeaver.png)

Feature:
#### Features:
* Extract logs by block number range: from - to
* Filter logs by contract address: optional
* Execute job by rest api: `api/v1/exec-job?job_id=test_1&block_number_start=10000&block_number_end=3000000`

#### start.sh Exmaple:
```shell
export DB_PATH=/home/art/dev/sepolia-chaindata/
export HTTP_ADDRESS=0.0.0.0
export HTTP_PORT=9090
export RESULT_PATH=/tmp

TODO: Use http rpc for creating new job
./erigon_db_reader
```
10 changes: 5 additions & 5 deletions src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ use std::path::PathBuf;
use std::sync::Arc;
use tokio::time::Instant;

pub struct Engine {
pub struct SearchEngine {
db_path: PathBuf,
result_storage: Arc<ResultStorage>,
}

pub const LOG_CHUNK_SIZE: usize = 10_000;

impl Engine {
impl SearchEngine {
pub fn new(db_path: PathBuf, result_storage: Arc<ResultStorage>) -> Self {
info!("Db path:{}", db_path.to_str().unwrap());

Engine {
SearchEngine {
db_path,
result_storage,
}
Expand Down Expand Up @@ -130,7 +130,7 @@ impl Engine {

#[cfg(test)]
mod test {
use crate::engine::Engine;
use crate::engine::SearchEngine;
use crate::storage::ResultStorage;
use crate::util::setup_log;
use std::path::PathBuf;
Expand All @@ -142,7 +142,7 @@ mod test {
let result_path = "/tmp";
let db_path = "/home/art/dev/sepolia-chaindata/";
let result_storage = Arc::new(ResultStorage::new(PathBuf::from(result_path)));
let engine = Arc::new(Engine::new(PathBuf::from(db_path), result_storage.clone()));
let engine = Arc::new(SearchEngine::new(PathBuf::from(db_path), result_storage.clone()));

engine
.execute_job("test-1".to_string(), 10_000, 3_000_000, None)
Expand Down
47 changes: 47 additions & 0 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
mod types;

use crate::engine::{SearchEngine};
use ethers::types::{Address};
use rocket::{get, routes, State};
use std::str::FromStr;
use std::sync::Arc;
use crate::http::types::ExecJobRequest;

pub struct HttpApi {}

impl HttpApi {
pub async fn new(port: u16, address: String, search_engine: Arc<SearchEngine>) {
rocket::build()
.configure(rocket::Config {
address: address.parse().unwrap(),
port,
..rocket::Config::default()
})
.manage(search_engine)
.mount("/", routes![exec_job])
.launch()
.await
.expect("Err setup");
}
}

// Example: api/v1/exec-job?job_id=test_1&block_number_start=10000&block_number_end=3000000
#[get("/api/v1/exec-job?<query..>")]
async fn exec_job(search_engine: &State<Arc<SearchEngine>>, query: ExecJobRequest) -> String {
let _search_engine = search_engine.inner().clone();
let _job_id = query.job_id.clone();

tokio::spawn(async move {
_search_engine.execute_job(
query.job_id,
query.block_number_start,
query.block_number_end,
query
.contract
.map(|c| Address::from_str(c.as_str()).unwrap())
).await.unwrap();
});


return _job_id;
}
10 changes: 10 additions & 0 deletions src/http/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

use rocket::{FromForm};

#[derive(FromForm, Debug)]
pub struct ExecJobRequest {
pub job_id: String,
pub block_number_start: u64,
pub block_number_end: u64,
pub contract: Option<String>,
}
13 changes: 11 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,34 @@ mod engine;
mod storage;
mod types;
mod util;
mod http;

use std::path::PathBuf;

use crate::engine::Engine;
use crate::engine::SearchEngine;
use crate::storage::ResultStorage;
use crate::types::AppCfg;
use crate::util::setup_log;
use clap::Parser;
use log::info;
use std::sync::Arc;
use crate::http::HttpApi;

#[tokio::main]
async fn main() {
setup_log();
info!("Start app");
let app_cfg = AppCfg::parse();
let result_storage = Arc::new(ResultStorage::new(PathBuf::from(app_cfg.result_path)));
let engine = Arc::new(Engine::new(
let search_engine = Arc::new(SearchEngine::new(
PathBuf::from(app_cfg.db_path),
result_storage.clone(),
));

HttpApi::new(
app_cfg.http_port,
app_cfg.http_address,
search_engine.clone(),
)
.await;
}
59 changes: 38 additions & 21 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,28 @@ impl ResultStorage {
.set_compression(Compression::SNAPPY)
.build();
let mut writer = SerializedFileWriter::new(file, schema, props.into()).unwrap();
// reuse
let mut block_n_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut tx_n_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut ctr_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut op_code_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut topic_0_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut topic_0_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut topic_1_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut topic_1_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut topic_2_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut topic_2_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut topic_3_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut topic_3_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut data_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut data_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE);

while let Some(chunk) = rcv.recv().await {
if chunk.is_none() {
break;
}
let chunk = chunk.unwrap();
info!("[{}] Chunk size: {:?}", _job_id, chunk.txs.len());
let mut block_n_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut tx_n_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut ctr_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut op_code_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut topic_0_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut topic_0_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut topic_1_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut topic_1_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut topic_2_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut topic_2_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut topic_3_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut topic_3_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut data_col = Vec::with_capacity(LOG_CHUNK_SIZE);
let mut data_def_level_col = Vec::with_capacity(LOG_CHUNK_SIZE);

for _log_model in chunk.txs {
let (block_n, tx_n, logs) = _log_model;
Expand All @@ -98,28 +99,28 @@ impl ResultStorage {
ctr_col.push(ByteArray::from(logs.address.as_bytes()));
op_code_col.push(logs.topics.len() as i32);

Self::save_topic(
Self::populate_topic(
&mut topic_0_col,
&mut topic_0_def_level_col,
logs.topics.get(0),
);
Self::save_topic(
Self::populate_topic(
&mut topic_1_col,
&mut topic_1_def_level_col,
logs.topics.get(1),
);
Self::save_topic(
Self::populate_topic(
&mut topic_2_col,
&mut topic_2_def_level_col,
logs.topics.get(2),
);
Self::save_topic(
Self::populate_topic(
&mut topic_3_col,
&mut topic_3_def_level_col,
logs.topics.get(3),
);

Self::save_binary(&mut data_col, &mut data_def_level_col, logs.data);
Self::populate_binary(&mut data_col, &mut data_def_level_col, logs.data);
}

let mut row_group_writer = writer.next_row_group().unwrap();
Expand Down Expand Up @@ -186,6 +187,22 @@ impl ResultStorage {
.unwrap();
col_writer.close().unwrap();
row_group_writer.close().unwrap();

// free mem
block_n_col.clear();
tx_n_col.clear();
ctr_col.clear();
op_code_col.clear();
topic_0_col.clear();
topic_0_def_level_col.clear();
topic_1_col.clear();
topic_1_def_level_col.clear();
topic_2_col.clear();
topic_2_def_level_col.clear();
topic_3_col.clear();
topic_3_def_level_col.clear();
data_col.clear();
data_def_level_col.clear();
}

info!("[{}] Stop result writer.", job_id);
Expand All @@ -195,7 +212,7 @@ impl ResultStorage {
(sender, worker)
}

fn save_topic(col: &mut Vec<ByteArray>, def_level_col: &mut Vec<i16>, topic: Option<&H256>) {
fn populate_topic(col: &mut Vec<ByteArray>, def_level_col: &mut Vec<i16>, topic: Option<&H256>) {
if let Some(data) = topic {
col.push(ByteArray::from(data.as_bytes()));
def_level_col.push(1_i16);
Expand All @@ -205,7 +222,7 @@ impl ResultStorage {
}
}

fn save_binary(col: &mut Vec<ByteArray>, def_level_col: &mut Vec<i16>, binary: Option<Bytes>) {
fn populate_binary(col: &mut Vec<ByteArray>, def_level_col: &mut Vec<i16>, binary: Option<Bytes>) {
if let Some(data) = binary {
if data.is_empty() {
col.push(ByteArray::from(vec![]));
Expand Down

0 comments on commit 4bd14ad

Please sign in to comment.