Skip to content

Commit

Permalink
Merge pull request #227 from blockworks-foundation/postgres_saving_bl…
Browse files Browse the repository at this point in the history
…ocks

Saving block and transactions in postgres table
  • Loading branch information
godmodegalactus authored Oct 9, 2023
2 parents 68033c1 + fe3509c commit 53a84b1
Show file tree
Hide file tree
Showing 12 changed files with 315 additions and 156 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions core/src/traits/block_storage_interface.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::structures::produced_block::ProducedBlock;
use anyhow::Result;
use async_trait::async_trait;
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::slot_history::Slot;
Expand All @@ -7,11 +8,12 @@ use std::{ops::Range, sync::Arc};
#[async_trait]
pub trait BlockStorageInterface: Send + Sync {
// will save a block
async fn save(&self, block: ProducedBlock);
async fn save(&self, block: ProducedBlock) -> Result<()>;
// will get a block
async fn get(&self, slot: Slot, config: RpcBlockConfig) -> Option<ProducedBlock>;
async fn get(&self, slot: Slot, config: RpcBlockConfig) -> Result<ProducedBlock>;
// will get range of slots that are stored in the storage
async fn get_slot_range(&self) -> Range<Slot>;
}

pub type BlockStorageImpl = Arc<dyn BlockStorageInterface>;
pub const BLOCK_NOT_FOUND: &str = "Block not found";
1 change: 1 addition & 0 deletions history/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ log = {workspace = true}
chrono = {workspace = true}
bincode = {workspace = true}
base64 = {workspace = true}
itertools = {workspace = true}
tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] }
17 changes: 12 additions & 5 deletions history/src/block_stores/inmemory_block_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use async_trait::async_trait;
use solana_lite_rpc_core::{
commitment_utils::Commitment, structures::produced_block::ProducedBlock,
traits::block_storage_interface::BlockStorageInterface,
commitment_utils::Commitment,
structures::produced_block::ProducedBlock,
traits::block_storage_interface::{BlockStorageInterface, BLOCK_NOT_FOUND},
};
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::slot_history::Slot;
Expand Down Expand Up @@ -52,12 +53,18 @@ impl InmemoryBlockStore {

#[async_trait]
impl BlockStorageInterface for InmemoryBlockStore {
async fn save(&self, block: ProducedBlock) {
async fn save(&self, block: ProducedBlock) -> anyhow::Result<()> {
self.store(block).await;
Ok(())
}

async fn get(&self, slot: Slot, _: RpcBlockConfig) -> Option<ProducedBlock> {
self.block_storage.read().await.get(&slot).cloned()
async fn get(&self, slot: Slot, _: RpcBlockConfig) -> anyhow::Result<ProducedBlock> {
self.block_storage
.read()
.await
.get(&slot)
.cloned()
.ok_or(anyhow::Error::msg(BLOCK_NOT_FOUND))
}

async fn get_slot_range(&self) -> Range<Slot> {
Expand Down
30 changes: 16 additions & 14 deletions history/src/block_stores/multiple_strategy_block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
// Fetches legacy blocks from faithful

use crate::block_stores::inmemory_block_store::InmemoryBlockStore;
use anyhow::{bail, Result};
use async_trait::async_trait;
use solana_lite_rpc_core::{
commitment_utils::Commitment,
structures::produced_block::ProducedBlock,
traits::block_storage_interface::{BlockStorageImpl, BlockStorageInterface},
traits::block_storage_interface::{BlockStorageImpl, BlockStorageInterface, BLOCK_NOT_FOUND},
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::config::RpcBlockConfig;
Expand Down Expand Up @@ -42,7 +43,7 @@ impl MultipleStrategyBlockStorage {
}
}

pub async fn get_in_memory_block(&self, slot: Slot) -> Option<ProducedBlock> {
pub async fn get_in_memory_block(&self, slot: Slot) -> anyhow::Result<ProducedBlock> {
self.inmemory_for_storage
.get(
slot,
Expand All @@ -60,46 +61,47 @@ impl MultipleStrategyBlockStorage {

#[async_trait]
impl BlockStorageInterface for MultipleStrategyBlockStorage {
async fn save(&self, block: ProducedBlock) {
async fn save(&self, block: ProducedBlock) -> Result<()> {
let slot = block.slot;
let commitment = Commitment::from(block.commitment_config);
match commitment {
Commitment::Confirmed | Commitment::Processed => {
self.inmemory_for_storage.save(block).await;
self.inmemory_for_storage.save(block).await?;
}
Commitment::Finalized => {
let block_in_mem = self.get_in_memory_block(block.slot).await;
match block_in_mem {
Some(block_in_mem) => {
Ok(block_in_mem) => {
// check if inmemory blockhash is same as finalized, update it if they are not
// we can have two machines with same identity publishing two different blocks on same slot
if block_in_mem.blockhash != block.blockhash {
self.inmemory_for_storage.save(block.clone()).await;
self.inmemory_for_storage.save(block.clone()).await?;
}
}
None => self.inmemory_for_storage.save(block.clone()).await,
Err(_) => self.inmemory_for_storage.save(block.clone()).await?,
}
self.persistent_block_storage.save(block).await;
self.persistent_block_storage.save(block).await?;
}
};
if slot > self.last_confirmed_slot.load(Ordering::Relaxed) {
self.last_confirmed_slot.store(slot, Ordering::Relaxed);
}
Ok(())
}

async fn get(
&self,
slot: solana_sdk::slot_history::Slot,
config: RpcBlockConfig,
) -> Option<ProducedBlock> {
) -> Result<ProducedBlock> {
let last_confirmed_slot = self.last_confirmed_slot.load(Ordering::Relaxed);
if slot > last_confirmed_slot {
None
bail!(BLOCK_NOT_FOUND);
} else {
let range = self.inmemory_for_storage.get_slot_range().await;
if range.contains(&slot) {
let block = self.inmemory_for_storage.get(slot, config).await;
if block.is_some() {
if block.is_ok() {
return block;
}
}
Expand All @@ -113,15 +115,15 @@ impl BlockStorageInterface for MultipleStrategyBlockStorage {
.get_block_with_config(slot, config)
.await
{
Ok(block) => Some(ProducedBlock::from_ui_block(
Ok(block) => Ok(ProducedBlock::from_ui_block(
block,
slot,
CommitmentConfig::finalized(),
)),
Err(_) => None,
Err(_) => bail!(BLOCK_NOT_FOUND),
}
} else {
None
bail!(BLOCK_NOT_FOUND);
}
}
}
Expand Down
100 changes: 100 additions & 0 deletions history/src/block_stores/postgres_block_store.rs
Original file line number Diff line number Diff line change
@@ -1 +1,101 @@
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use itertools::Itertools;
use solana_lite_rpc_core::{
structures::{epoch::EpochCache, produced_block::ProducedBlock},
traits::block_storage_interface::BlockStorageInterface,
};
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::{slot_history::Slot, stake_history::Epoch};
use tokio::sync::RwLock;

use crate::postgres::{
postgres_block::PostgresBlock, postgres_session::PostgresSessionCache,
postgres_transaction::PostgresTransaction,
};

#[derive(Default, Clone, Copy)]
pub struct PostgresData {
from_slot: Slot,
to_slot: Slot,
current_epoch: Epoch,
}

pub struct PostgresBlockStore {
session_cache: PostgresSessionCache,
epoch_cache: EpochCache,
postgres_data: Arc<RwLock<PostgresData>>,
}

impl PostgresBlockStore {
pub async fn start_new_epoch(&self, schema: &String) -> Result<()> {
// create schema for new epoch
let session = self
.session_cache
.get_session()
.await
.expect("should get new postgres session");

let statement = format!("CREATE SCHEMA {};", schema);
session.execute(&statement, &[]).await?;

// Create blocks table
let statement = PostgresBlock::create_statement(schema);
session.execute(&statement, &[]).await?;

// create transaction table
let statement = PostgresTransaction::create_statement(schema);
session.execute(&statement, &[]).await?;
Ok(())
}
}

#[async_trait]
impl BlockStorageInterface for PostgresBlockStore {
async fn save(&self, block: ProducedBlock) -> Result<()> {
let PostgresData { current_epoch, .. } = { *self.postgres_data.read().await };

let slot = block.slot;
let transactions = block
.transactions
.iter()
.map(|x| PostgresTransaction::new(x, slot))
.collect_vec();
let postgres_block = PostgresBlock::from(&block);

let epoch = self.epoch_cache.get_epoch_at_slot(slot);
let schema = format!("EPOCH_{}", epoch.epoch);
if current_epoch == 0 || current_epoch < epoch.epoch {
self.postgres_data.write().await.current_epoch = epoch.epoch;
self.start_new_epoch(&schema).await?;
}

const NUMBER_OF_TRANSACTION: usize = 20;

// save transaction
let chunks = transactions.chunks(NUMBER_OF_TRANSACTION);
let session = self
.session_cache
.get_session()
.await
.expect("should get new postgres session");
for chunk in chunks {
PostgresTransaction::save_transactions(&session, &schema, chunk).await?;
}
postgres_block.save(&session, &schema).await?;
Ok(())
}

async fn get(&self, slot: Slot, _config: RpcBlockConfig) -> Result<ProducedBlock> {
let range = self.get_slot_range().await;
if range.contains(&slot) {}
todo!()
}

async fn get_slot_range(&self) -> std::ops::Range<Slot> {
let lk = self.postgres_data.read().await;
lk.from_slot..lk.to_slot + 1
}
}
74 changes: 56 additions & 18 deletions history/src/postgres/postgres_block.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,83 @@
use super::postgres_session::SchemaSize;
use solana_lite_rpc_core::{
commitment_utils::Commitment, encoding::BASE64, structures::produced_block::ProducedBlock,
};
use solana_lite_rpc_core::{encoding::BASE64, structures::produced_block::ProducedBlock};
use tokio_postgres::types::ToSql;

use super::postgres_session::PostgresSession;

#[derive(Debug)]
pub struct PostgresBlock {
pub leader_id: Option<String>,
pub slot: i64,
pub blockhash: String,
pub block_height: i64,
pub slot: i64,
pub parent_slot: i64,
pub block_time: i64,
pub commitment_config: i8,
pub previous_blockhash: String,
pub rewards: Option<String>,
}

impl SchemaSize for PostgresBlock {
const DEFAULT_SIZE: usize = 4 * 8;
const MAX_SIZE: usize = Self::DEFAULT_SIZE + 8;
}
const NB_ARUMENTS: usize = 7;

impl From<ProducedBlock> for PostgresBlock {
fn from(value: ProducedBlock) -> Self {
let commitment = Commitment::from(&value.commitment_config);
impl From<&ProducedBlock> for PostgresBlock {
fn from(value: &ProducedBlock) -> Self {
let rewards = value
.rewards
.as_ref()
.map(|x| BASE64.serialize(x).ok())
.unwrap_or(None);

Self {
leader_id: value.leader_id,
blockhash: value.blockhash,
blockhash: value.blockhash.clone(),
block_height: value.block_height as i64,
slot: value.slot as i64,
parent_slot: value.parent_slot as i64,
block_time: value.block_time as i64,
commitment_config: commitment as i8,
previous_blockhash: value.previous_blockhash,
previous_blockhash: value.previous_blockhash.clone(),
rewards,
}
}
}

impl PostgresBlock {
pub fn create_statement(schema: &String) -> String {
format!(
"
CREATE TABLE {}.BLOCKS (
slot BIGINT PRIMARY KEY,
blockhash STRING NOT NULL,
leader_id STRING,
block_height BIGINT NOT NULL,
parent_slot BIGINT NOT NULL,
block_time BIGINT NOT NULL,
previous_blockhash STRING NOT NULL,
rewards STRING,
);
",
schema
)
}

pub async fn save(
&self,
postgres_session: &PostgresSession,
schema: &String,
) -> anyhow::Result<()> {
let mut query = format!(
r#"
INSERT INTO {}.BLOCKS (slot, blockhash, block_height, parent_slot, block_time, previous_blockhash, rewards) VALUES
"#,
schema
);

let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NB_ARUMENTS);
args.push(&self.slot);
args.push(&self.blockhash);
args.push(&self.block_height);
args.push(&self.parent_slot);
args.push(&self.block_time);
args.push(&self.previous_blockhash);
args.push(&self.rewards);

PostgresSession::multiline_query(&mut query, NB_ARUMENTS, 1, &[]);
postgres_session.execute(&query, &args).await?;
Ok(())
}
}
Loading

0 comments on commit 53a84b1

Please sign in to comment.