diff --git a/Cargo.lock b/Cargo.lock index 44a3e7a..59f5bcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1655,14 +1655,14 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32f05bccc8b6036fec4e0c511954e3997987a82acb6a0b50642ecf7c744fe225" dependencies = [ - "parse_arg 1.0.0", + "parse_arg 1.0.1", ] [[package]] name = "parse_arg" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7aa7e02eed7573816a0edb0d7f7aed7cdd59a22d101c3e9dc5e5ea3b935d3346" +checksum = "5bddc33f680b79eaf1e2e56da792c3c2236f86985bbc3a886e8ddee17ae4d3a4" [[package]] name = "paste" diff --git a/client/src/client.rs b/client/src/client.rs index 2bb9223..f04cb6f 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -64,7 +64,7 @@ impl ConnectionDelegate for Delegate { TcpStream::connect(remote).unwrap_or_else(|err| { #[cfg(feature = "log")] log::error!("Unable to connect BP Node {remote} due to {err}"); - eprintln!("Unable to connect BP Node {remote}"); + eprintln!("Unable to connect BP Node {remote} due to {err}"); exit(1); }) } @@ -74,9 +74,9 @@ impl ConnectionDelegate for Delegate { log::info!("connection to the server is established"); } - fn on_disconnect(&mut self, err: Error, _attempt: usize) -> OnDisconnect { + fn on_disconnect(&mut self, _err: Error, _attempt: usize) -> OnDisconnect { #[cfg(feature = "log")] - log::error!("disconnected due to {err}"); + log::error!("disconnected due to {_err}"); OnDisconnect::Terminate } diff --git a/client/src/exporter.rs b/client/src/exporter.rs index fd62d2f..6093e0f 100644 --- a/client/src/exporter.rs +++ b/client/src/exporter.rs @@ -57,22 +57,27 @@ impl ConnectionDelegate for BlockExporter { fn connect(&mut self, remote: &RemoteAddr) -> Session { TcpStream::connect(remote).unwrap_or_else(|err| { + #[cfg(feature = "log")] log::error!(target: NAME, "Unable to connect BP Node {remote} due to {err}"); + #[cfg(feature = "log")] log::warn!(target: NAME, "Stopping RPC import thread"); exit(1); }) } fn on_established(&mut self, remote: SocketAddr, _attempt: usize) { + #[cfg(feature = "log")] log::info!(target: NAME, "Connected to BP Node {remote}, sending `hello(...)`"); } fn on_disconnect(&mut self, err: std::io::Error, _attempt: usize) -> OnDisconnect { + #[cfg(feature = "log")] log::error!(target: NAME, "BP Node got disconnected due to {err}"); exit(1) } fn on_io_error(&mut self, err: reactor::Error>) { + #[cfg(feature = "log")] log::error!(target: NAME, "I/O error in communicating with BP Node: {err}"); self.disconnect(); } @@ -85,14 +90,17 @@ impl ClientDelegate for BlockExporter { match msg { ImporterReply::Filters(filters) => { if self.filters_received { + #[cfg(feature = "log")] log::warn!(target: NAME, "Received duplicate filters"); } else { + #[cfg(feature = "log")] log::info!(target: NAME, "Received filters"); } self.filters = filters; self.filters_received = true; } ImporterReply::Error(failure) => { + #[cfg(feature = "log")] log::error!(target: NAME, "Received error from BP Node: {failure}"); self.disconnect(); } @@ -100,6 +108,7 @@ impl ClientDelegate for BlockExporter { } fn on_reply_unparsable(&mut self, err: ::Error) { + #[cfg(feature = "log")] log::error!("Invalid message from BP Node: {err}"); } } diff --git a/client/src/main.rs b/client/src/main.rs index d1722e2..380a4ff 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -56,7 +56,9 @@ fn cb(reply: Response) { Response::Failure(failure) => { println!("Failure: {failure}"); } - Response::Pong(_noise) => {} + Response::Pong(_noise) => { + println!("Pong from server"); + } Response::Status(status) => { println!("{}", serde_yaml::to_string(&status).unwrap()); } diff --git a/providers/bitcoincore/src/main.rs b/providers/bitcoincore/src/main.rs index bd4943f..42d15c7 100644 --- a/providers/bitcoincore/src/main.rs +++ b/providers/bitcoincore/src/main.rs @@ -39,7 +39,11 @@ use strict_encoding::Ident; pub const AGENT: &str = "BC_BP"; -pub const BLOCK_SEPARATOR: [u8; 4] = [0xF9, 0xBE, 0xB4, 0xD9]; +pub const BITCOIN_BLOCK_SEPARATOR: [u8; 4] = [0xF9, 0xBE, 0xB4, 0xD9]; +pub const TESTNET_BLOCK_SEPARATOR: [u8; 4] = [0x0B, 0x11, 0x09, 0x07]; +pub const TESTNET4_BLOCK_SEPARATOR: [u8; 4] = [0x1c, 0x16, 0x3f, 0x28]; +pub const SIGNET_BLOCK_SEPARATOR: [u8; 4] = [0x0A, 0x03, 0xCF, 0x40]; +pub const REGTEST_BLOCK_SEPARATOR: [u8; 4] = [0xFA, 0xBF, 0xB5, 0xDA]; /// Command-line arguments #[derive(Parser)] @@ -104,6 +108,15 @@ fn read_blocks(client: Client, args: Args) { exit(1); } + // Select the correct block separator according to the network type + let block_separator = match args.network { + Network::Mainnet => BITCOIN_BLOCK_SEPARATOR, + Network::Testnet3 => TESTNET_BLOCK_SEPARATOR, + Network::Testnet4 => TESTNET4_BLOCK_SEPARATOR, + Network::Signet => SIGNET_BLOCK_SEPARATOR, + Network::Regtest => REGTEST_BLOCK_SEPARATOR, + }; + let mut file_no: u32 = 0; let mut total_blocks: u32 = 0; let mut total_tx: u64 = 0; @@ -138,7 +151,13 @@ fn read_blocks(client: Client, args: Args) { exit(4); } } - if buf != BLOCK_SEPARATOR { + + if buf == [0x00, 0x00, 0x00, 0x00] { + log::info!("Reached end of block file"); + break; + } + + if buf != block_separator { log::error!( "Invalid block separator 0x{:02X}{:02X}{:02X}{:02X} before block #{block_no}", buf[0], diff --git a/src/bin/bpd.rs b/src/bin/bpd.rs index fc23d93..0a6d03b 100644 --- a/src/bin/bpd.rs +++ b/src/bin/bpd.rs @@ -27,16 +27,30 @@ extern crate clap; mod opts; use std::fs; +use std::path::Path; use std::process::{ExitCode, Termination, exit}; pub use bpnode; -use bpnode::{Broker, BrokerError, Config, PATH_INDEXDB}; +use bpnode::{Broker, BrokerError, Config, PATH_INDEXDB, initialize_db_tables}; +use bpwallet::Network; use clap::Parser; use loglevel::LogLevel; use redb::Database; use crate::opts::{Command, Opts}; +// Exit status codes for different error conditions +// see also constants in `db.rs` +const EXIT_PATH_ACCESS_ERROR: i32 = 1; +const EXIT_DB_EXISTS_ERROR: i32 = 2; +const EXIT_DIR_CREATE_ERROR: i32 = 3; +const EXIT_DB_CREATE_ERROR: i32 = 4; +const EXIT_DB_OPEN_ERROR: i32 = 5; +const EXIT_NETWORK_MISMATCH: i32 = 10; +const EXIT_NO_NETWORK_INFO: i32 = 11; +const EXIT_DB_NOT_FOUND: i32 = 12; + +/// Wrapper for result status to implement Termination trait struct Status(Result<(), BrokerError>); impl Termination for Status { @@ -58,37 +72,124 @@ fn main() -> Status { log::debug!("Command-line arguments: {:#?}", &opts); match opts.command { - Some(Command::Init) => { - eprint!("Initializing ... "); - let index_path = opts.general.data_dir.join(PATH_INDEXDB); - match fs::exists(&index_path) { - Err(err) => { - eprintln!("unable to access path '{}': {err}", index_path.display()); - exit(1); - } - Ok(true) => { - eprintln!("index database directory already exists, cancelling"); - exit(2); - } - Ok(false) => {} - } - if let Err(err) = fs::create_dir_all(&opts.general.data_dir) { + Some(Command::Init) => initialize_database(&opts), + None => run_node(opts), + } +} + +/// Initialize a new database for the BP Node +fn initialize_database(opts: &Opts) -> Status { + eprint!("Initializing ... "); + + // Prepare the database path + let index_path = opts.general.data_dir.join(PATH_INDEXDB); + + // Check if database already exists + if let Err(err) = check_db_path(&index_path, false) { + return err; + } + + // Create data directory if needed + if let Err(err) = fs::create_dir_all(&opts.general.data_dir) { + eprintln!( + "Unable to create data directory at '{}'\n{err}", + opts.general.data_dir.display() + ); + exit(EXIT_DIR_CREATE_ERROR); + } + + // Create the database + let db = match Database::create(&index_path) { + Ok(db) => db, + Err(err) => { + eprintln!("Unable to create index database.\n{err}"); + exit(EXIT_DB_CREATE_ERROR); + } + }; + + // Initialize database with network information and create all tables + let network = opts.general.network; + initialize_db_tables(&db, network); + + eprintln!("Index database initialized for {} network, exiting", network); + Status(Ok(())) +} + +/// Run the BP Node service +fn run_node(opts: Opts) -> Status { + let conf = Config::from(opts); + let index_path = conf.data_dir.join(PATH_INDEXDB); + + // Check if database exists + if let Err(err) = check_db_path(&index_path, true) { + return err; + } + + // Verify network configuration + verify_network_configuration(&index_path, &conf.network); + + // Start the broker service + Status(Broker::start(conf).and_then(|runtime| runtime.run())) +} + +/// Check if database path exists or not, depending on expected state +fn check_db_path(index_path: &Path, should_exist: bool) -> Result<(), Status> { + match fs::exists(index_path) { + Err(err) => { + eprintln!("Unable to access path '{}': {err}", index_path.display()); + exit(EXIT_PATH_ACCESS_ERROR); + } + Ok(exists) => { + if exists && !should_exist { + eprintln!("Index database directory already exists, cancelling"); + exit(EXIT_DB_EXISTS_ERROR); + } else if !exists && should_exist { eprintln!( - "unable to create data directory at '{}'\n{err}", - opts.general.data_dir.display() + "ERROR: Database not found! Please initialize with 'bpd init' command first." ); - exit(3); + exit(EXIT_DB_NOT_FOUND); } - if let Err(err) = Database::create(&index_path) { - eprintln!("unable to create index database.\n{err}"); - exit(4); - } - eprintln!("index database initialized, exiting"); - Status(Ok(())) - } - None => { - let conf = Config::from(opts); - Status(Broker::start(conf).and_then(|runtime| runtime.run())) } } + Ok(()) +} + +/// Verify that database network configuration matches the configured network +fn verify_network_configuration(index_path: &Path, configured_network: &Network) { + let Ok(db) = Database::open(index_path) + .inspect_err(|err| eprintln!("Error: could not open the database due to {err}")) + else { + exit(EXIT_DB_OPEN_ERROR) + }; + let Ok(tx) = db + .begin_read() + .inspect_err(|err| eprintln!("Error: could not access the database due to {err}")) + else { + exit(EXIT_DB_OPEN_ERROR) + }; + let Ok(main_table) = tx + .open_table(bpnode::db::TABLE_MAIN) + .inspect_err(|err| eprintln!("Error: could not open the main table due to {err}")) + else { + exit(EXIT_DB_OPEN_ERROR) + }; + let Ok(Some(network_rec)) = main_table.get(bpnode::REC_NETWORK) else { + // Network information isn't found in the database + eprintln!("ERROR: Database exists but doesn't contain network information."); + eprintln!("Please reinitialize the database with `bpd init` command."); + exit(EXIT_NO_NETWORK_INFO); + }; + let stored_network = String::from_utf8_lossy(network_rec.value()); + if stored_network != configured_network.to_string() { + eprintln!("ERROR: Database network mismatch!"); + eprintln!("Configured network: {}", configured_network); + eprintln!("Database network: {}", stored_network); + eprintln!("Each BP-Node instance works with a single chain."); + eprintln!( + "To use a different network, create a separate instance with a different data \ + directory." + ); + exit(EXIT_NETWORK_MISMATCH); + } + log::info!("Database network matches configured network: {}", stored_network); } diff --git a/src/blocks.rs b/src/blocks.rs index b0d91a9..ce3828b 100644 --- a/src/blocks.rs +++ b/src/blocks.rs @@ -24,21 +24,299 @@ //! Block importer interface organized into a reactor thread. use std::collections::HashSet; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use amplify::{ByteArray, FromSliceError}; use bprpc::BloomFilter32; use bpwallet::{Block, BlockHash}; use crossbeam_channel::{RecvError, SendError, Sender}; use microservices::USender; -use redb::{CommitError, ReadableTable, StorageError, TableError}; +use redb::{ + CommitError, ReadableTable, ReadableTableMetadata, StorageError, TableError, WriteTransaction, +}; use crate::ImporterMsg; use crate::db::{ - DbBlockHeader, DbMsg, DbTx, REC_TXNO, TABLE_BLKS, TABLE_MAIN, TABLE_TXES, TABLE_TXIDS, TxNo, + BlockId, DbBlock, DbBlockHeader, DbMsg, DbTx, ForkId, REC_BLOCKID, REC_FORK_ID, REC_TXNO, + TABLE_BLKS, TABLE_BLOCK_HEIGHTS, TABLE_BLOCK_SPENDS, TABLE_BLOCK_TXS, TABLE_BLOCKIDS, + TABLE_FORK_BLOCKS, TABLE_FORK_TIPS, TABLE_FORKS, TABLE_HEIGHTS, TABLE_INPUTS, TABLE_MAIN, + TABLE_ORPHAN_PARENTS, TABLE_ORPHANS, TABLE_OUTS, TABLE_SPKS, TABLE_TX_BLOCKS, TABLE_TXES, + TABLE_TXIDS, TABLE_UTXOS, TxNo, }; const NAME: &str = "blockproc"; +// TODO: Make this configuration options +// Constants for orphan block management +const MAX_ORPHAN_BLOCKS: usize = 100; +// Orphan blocks expire after 24 hours +const ORPHAN_EXPIRY_HOURS: u64 = 24; + +/// Table context for transaction processing +/// +/// This structure holds references to all database tables needed for transaction processing. +/// It helps avoid repeated opening of the same tables and provides a unified interface for +/// transaction processing logic that can be shared between different block processing functions. +struct TxTablesContext<'a> { + /// Maps transaction IDs to transaction numbers + txids_table: redb::Table<'a, [u8; 32], TxNo>, + + /// Maps transaction numbers to block IDs + tx_blocks_table: redb::Table<'a, TxNo, BlockId>, + + /// Tracks unspent transaction outputs (UTXOs) + utxos_table: redb::Table<'a, (TxNo, u32), ()>, + + /// Maps transaction inputs to the outputs they spend + inputs_table: redb::Table<'a, (TxNo, u32), (TxNo, u32)>, + + /// Maps transaction numbers to transactions that spend their outputs + outs_table: redb::Table<'a, TxNo, Vec>, + + /// Maps script public keys to transactions containing them + spks_table: redb::Table<'a, &'static [u8], Vec>, + + /// Stores complete transaction data + txes_table: redb::Table<'a, TxNo, DbTx>, + + /// Maps block IDs to transactions they contain + block_txs_table: redb::Table<'a, BlockId, Vec>, + + /// Records UTXOs spent in each block (for rollback purposes) + block_spends_table: redb::Table<'a, BlockId, Vec<(TxNo, u32)>>, +} + +impl<'a> TxTablesContext<'a> { + /// Creates a new transaction tables context from a database transaction + fn new(db: &'a WriteTransaction) -> Result { + Ok(Self { + txids_table: db + .open_table(TABLE_TXIDS) + .map_err(BlockProcError::TxidTable)?, + + tx_blocks_table: db + .open_table(TABLE_TX_BLOCKS) + .map_err(|e| BlockProcError::Custom(format!("Tx-blocks table error: {}", e)))?, + + utxos_table: db + .open_table(TABLE_UTXOS) + .map_err(|e| BlockProcError::Custom(format!("UTXOs table error: {}", e)))?, + + inputs_table: db + .open_table(TABLE_INPUTS) + .map_err(|e| BlockProcError::Custom(format!("Inputs table error: {}", e)))?, + + outs_table: db + .open_table(TABLE_OUTS) + .map_err(|e| BlockProcError::Custom(format!("Outs table error: {}", e)))?, + + spks_table: db + .open_table(TABLE_SPKS) + .map_err(|e| BlockProcError::Custom(format!("SPKs table error: {}", e)))?, + + txes_table: db + .open_table(TABLE_TXES) + .map_err(BlockProcError::TxesTable)?, + + block_txs_table: db + .open_table(TABLE_BLOCK_TXS) + .map_err(|e| BlockProcError::Custom(format!("Block-txs table error: {}", e)))?, + + block_spends_table: db + .open_table(TABLE_BLOCK_SPENDS) + .map_err(|e| BlockProcError::Custom(format!("Block spends table error: {}", e)))?, + }) + } + + /// Process a single transaction, handling all database operations + /// + /// This method abstracts the common logic for processing transactions in both + /// normal block processing and during chain reorganization. + /// + /// # Parameters + /// * `tx` - The transaction to process + /// * `block_id` - ID of the block containing the transaction + /// * `txno_counter` - Current transaction number counter (will be incremented if needed) + /// * `block_txs` - Vector to collect transaction IDs for this block + /// * `block_spends` - Vector to collect UTXOs spent in this block + /// + /// # Returns + /// * `Result<(TxNo, bool), BlockProcError>` - Transaction number and whether it's a new + /// transaction + fn process_transaction( + &mut self, + tx: &bpwallet::Tx, + block_id: BlockId, + txno_counter: &mut TxNo, + block_txs: &mut Vec, + block_spends: &mut Vec<(TxNo, u32)>, + ) -> Result<(TxNo, bool), BlockProcError> { + // Calculate transaction ID + let txid = tx.txid(); + let txid_bytes = txid.to_byte_array(); + + // Check if this txid already exists + let existing_txno = self + .txids_table + .get(txid_bytes) + .map_err(BlockProcError::TxidLookup)? + .map(|v| v.value()); + + // Get or assign transaction number + let txno = if let Some(existing) = existing_txno { + existing // Use existing transaction number + } else { + // Assign new transaction number + txno_counter.inc_assign(); + *txno_counter + }; + + // Add transaction to the list for this block + block_txs.push(txno); + + // Store transaction ID mapping (or update if needed) + self.txids_table + .insert(txid_bytes, txno) + .map_err(BlockProcError::TxidStorage)?; + + // Associate transaction with block ID + self.tx_blocks_table + .insert(txno, block_id) + .map_err(|e| BlockProcError::Custom(format!("Tx-blocks storage error: {}", e)))?; + + // Process transaction inputs + for (vin_idx, input) in tx.inputs.iter().enumerate() { + if !input.prev_output.is_coinbase() { + let prev_txid = input.prev_output.txid; + let prev_vout = input.prev_output.vout; + + // Look up previous transaction number + if let Some(prev_txno) = self + .txids_table + .get(prev_txid.to_byte_array()) + .map_err(BlockProcError::TxidLookup)? + .map(|v| v.value()) + { + // Mark UTXO as spent + self.utxos_table + .remove(&(prev_txno, prev_vout.into_u32())) + .map_err(|e| { + BlockProcError::Custom(format!("UTXOs removal error: {}", e)) + })?; + + // Record UTXO spent in this block + block_spends.push((prev_txno, prev_vout.into_u32())); + + // Record input-output mapping + self.inputs_table + .insert((txno, vin_idx as u32), (prev_txno, prev_vout.into_u32())) + .map_err(|e| { + BlockProcError::Custom(format!("Inputs storage error: {}", e)) + })?; + + // Update spending relationships + let mut spending_txs = self + .outs_table + .get(prev_txno) + .map_err(|e| BlockProcError::Custom(format!("Outs lookup error: {}", e)))? + .map(|v| v.value().to_vec()) + .unwrap_or_default(); + + // Avoid duplicate entries in fork case + if !spending_txs.contains(&txno) { + spending_txs.push(txno); + self.outs_table + .insert(prev_txno, spending_txs) + .map_err(|e| { + BlockProcError::Custom(format!("Outs update error: {}", e)) + })?; + } + } + } + } + + // Process transaction outputs + for (vout_idx, output) in tx.outputs.iter().enumerate() { + // Add new UTXO + self.utxos_table + .insert((txno, vout_idx as u32), ()) + .map_err(|e| BlockProcError::Custom(format!("UTXOs storage error: {}", e)))?; + + // Index script pubkey + let script = &output.script_pubkey; + if !script.is_empty() { + let mut txnos = self + .spks_table + .get(script.as_slice()) + .map_err(|e| BlockProcError::Custom(format!("SPKs lookup error: {}", e)))? + .map(|v| v.value().to_vec()) + .unwrap_or_default(); + + // Avoid duplicate entries in fork case + if !txnos.contains(&txno) { + txnos.push(txno); + self.spks_table + .insert(script.as_slice(), txnos) + .map_err(|e| BlockProcError::Custom(format!("SPKs update error: {}", e)))?; + } + } + } + + // Store complete transaction data if it's new + if existing_txno.is_none() { + self.txes_table + .insert(txno, DbTx::from(tx.clone())) + .map_err(BlockProcError::TxesStorage)?; + } + + // Return transaction number and whether it was newly added + Ok((txno, existing_txno.is_none())) + } + + /// Finalize block processing by storing block transaction and spend data + /// + /// This method handles the common post-processing steps after all transactions + /// in a block have been processed. + /// + /// # Parameters + /// * `block_id` - ID of the processed block + /// * `block_txs` - Transaction IDs in this block + /// * `block_spends` - UTXOs spent in this block + /// * `txno_counter` - Current transaction number counter to update in the main table + /// + /// # Returns + /// * `Result<(), BlockProcError>` - Success or error + fn finalize_block_processing( + &mut self, + db: &WriteTransaction, + block_id: BlockId, + block_txs: Vec, + block_spends: Vec<(TxNo, u32)>, + txno_counter: TxNo, + ) -> Result<(), BlockProcError> { + // Store all transaction numbers in this block + self.block_txs_table + .insert(block_id, block_txs) + .map_err(|e| BlockProcError::Custom(format!("Block-txs storage error: {}", e)))?; + + // Store UTXOs spent in this block + self.block_spends_table + .insert(block_id, block_spends) + .map_err(|e| BlockProcError::Custom(format!("Block spends storage error: {}", e)))?; + + // Update global counters + let mut main = db + .open_table(TABLE_MAIN) + .map_err(BlockProcError::MainTable)?; + + // Update transaction counter + main.insert(REC_TXNO, txno_counter.to_byte_array().as_slice()) + .map_err(BlockProcError::TxNoUpdate)?; + + Ok(()) + } +} + pub struct BlockProcessor { db: USender, broker: Sender, @@ -56,77 +334,2023 @@ impl BlockProcessor { self.tracking.retain(|filter| !filters.contains(filter)); } + /// Check if a transaction should trigger a notification based on tracking filters + /// + /// # Parameters + /// * `txid_bytes` - Transaction ID bytes to check against filters + /// + /// # Returns + /// * `bool` - Whether notification should be sent + fn should_notify_transaction(&self, txid_bytes: [u8; 32]) -> bool { + if self.tracking.is_empty() { + return false; + } + + for filter in &self.tracking { + if filter.contains(txid_bytes) { + return true; + } + } + + false + } + + // Helper function to calculate block height based on previous block hash + fn calculate_block_height( + &self, + block: &Block, + db: &WriteTransaction, + ) -> Result { + // For genesis block, height is always 0 + // Check for all zeros hash which is the genesis block's prev_hash + let zero_hash = [0u8; 32]; + if block.header.prev_block_hash.to_byte_array() == zero_hash { + return Ok(0); + } + + // Find block height of the previous block and add 1 + // Lookup the block ID for the previous block hash + let blockids_table = db + .open_table(TABLE_BLOCKIDS) + .map_err(|e| BlockProcError::Custom(format!("Block IDs table error: {}", e)))?; + + let prev_blockid = blockids_table + .get(block.header.prev_block_hash.to_byte_array()) + .map_err(|e| BlockProcError::Custom(format!("Block ID lookup error: {}", e)))?; + + // If previous block not found, it's an orphan block + if prev_blockid.is_none() { + log::debug!( + target: NAME, + "Orphan block detected: parent block {} not found", + block.header.prev_block_hash + ); + return Err(BlockProcError::OrphanBlock(block.header.prev_block_hash)); + } + + // Get the previous block's ID + let prev_blockid = prev_blockid.unwrap().value(); + + // First check the BlockId to height mapping table which is more efficient + let block_heights_table = db + .open_table(TABLE_BLOCK_HEIGHTS) + .map_err(|e| BlockProcError::Custom(format!("Block heights table error: {}", e)))?; + + let height = block_heights_table + .get(prev_blockid) + .map_err(|e| BlockProcError::Custom(format!("Block height lookup error: {}", e)))? + .map(|v| { + let prev_height = v.value(); + prev_height + 1 + }) + .ok_or_else(|| { + // Parent block has blockid but no height record - this indicates a potential fork + // This typically happens when the parent block is part of a fork chain + let block_hash = block.block_hash(); + let parent_hash = block.header.prev_block_hash; + // Check if parent block is part of a known fork + if let Some(fork_id) = match self.find_fork_by_block_id(db, prev_blockid){ + Ok(Some(id)) => Some(id), + Ok(None) => None, + Err(e) => return e, + } { + // Found the fork - this block extends a fork chain + log::info!( + target: NAME, + "Block {} has parent {} which is part of fork {}", + block_hash, + parent_hash, + fork_id + ); + // Return specialized error for fork chain extension + return BlockProcError::ForkChainExtension(block_hash, parent_hash); + } + // If not part of a fork, it's likely a database inconsistency + log::warn!( + target: NAME, + "Database inconsistency: Block {} has parent {} with ID {} but no height record", + block_hash, + parent_hash, + prev_blockid + ); + BlockProcError::DatabaseInconsistency(block_hash, parent_hash, prev_blockid) + })?; + let heights_table = db + .open_table(TABLE_HEIGHTS) + .map_err(|e| BlockProcError::Custom(format!("Heights table error: {}", e)))?; + + // Check if we already have a block at this height + if let Some(existing_blockid) = heights_table + .get(height) + .map_err(|e| BlockProcError::Custom(format!("Heights lookup error: {}", e)))? + .map(|v| v.value()) + { + log::warn!( + target: NAME, + "Detected potential chain fork at height {}: existing block ID {}", + height, + existing_blockid, + ); + + return Err(BlockProcError::PotentialFork( + block.block_hash(), + height, + existing_blockid, + )); + } + Ok(height) + } + + /// Check if the block hash already exists in the database + fn is_block_exists( + &self, + db: &WriteTransaction, + block_hash: &BlockHash, + ) -> Result { + let blockids_table = db + .open_table(TABLE_BLOCKIDS) + .map_err(|e| BlockProcError::Custom(format!("Block IDs table error: {}", e)))?; + + let exists = blockids_table + .get(block_hash.to_byte_array()) + .map_err(|e| BlockProcError::Custom(format!("Block hash lookup error: {}", e)))? + .is_some(); + + Ok(exists) + } + pub fn process_block(&mut self, id: BlockHash, block: Block) -> Result { + // Store a copy of the parent hash for potential orphan block handling + let parent_hash = block.header.prev_block_hash; + // Clone the block for potential orphan processing + let block_clone = block.clone(); + + // Regular block processing starts here let (tx, rx) = crossbeam_channel::bounded(1); self.db.send(DbMsg::Write(tx))?; let db = rx.recv()?; - let mut txno = { + // Check if the block already exists + if self.is_block_exists(&db, &id)? { + log::warn!( + target: NAME, + "Block {} already exists in database, skipping processing", + id + ); + return Err(BlockProcError::Custom(format!("Block {} already exists", id))); + } + + // Get current transaction number + let mut txno_counter = { let main = db .open_table(TABLE_MAIN) .map_err(BlockProcError::MainTable)?; - let rec = main - .get(REC_TXNO) - .map_err(BlockProcError::TxNoAbsent)? - .unwrap(); - TxNo::from_slice(rec.value()).map_err(BlockProcError::TxNoInvalid)? + + // Get current transaction number or use starting value if not found + match main.get(REC_TXNO).map_err(BlockProcError::TxNoAbsent)? { + Some(rec) => TxNo::from_slice(rec.value()).map_err(BlockProcError::TxNoInvalid)?, + None => TxNo::start(), + } }; let mut count = 0; let process = || -> Result<(), BlockProcError> { - let mut table = db + // Calculate the block height based on previous block + // This function will also detect orphan blocks and potential forks + let height = self.calculate_block_height(&block, &db)?; + + let blockid = self.get_next_block_id(&db)?; + + // Initialize the transaction tables context + let mut tx_ctx = TxTablesContext::new(&db)?; + let mut blocks_table = db .open_table(TABLE_BLKS) .map_err(BlockProcError::BlockTable)?; - table - .insert(id.to_byte_array(), DbBlockHeader::from(block.header)) + + let mut blockids_table = db + .open_table(TABLE_BLOCKIDS) + .map_err(|e| BlockProcError::Custom(format!("Block IDs table error: {}", e)))?; + + let mut heights_table = db + .open_table(TABLE_HEIGHTS) + .map_err(BlockProcError::HeightsTable)?; + + let mut block_heights_table = db + .open_table(TABLE_BLOCK_HEIGHTS) + .map_err(|e| BlockProcError::Custom(format!("Block heights table error: {}", e)))?; + + // Store block header + blocks_table + .insert(blockid, DbBlockHeader::from(block.header)) .map_err(BlockProcError::BlockStorage)?; + // Map block hash to block ID + blockids_table + .insert(id.to_byte_array(), blockid) + .map_err(|e| BlockProcError::Custom(format!("Block ID storage error: {}", e)))?; + + log::debug!( + target: NAME, + "Processing block {} at height {} with internal ID {}", + id, + height, + blockid + ); + + heights_table + .insert(height, blockid) + .map_err(|e| BlockProcError::Custom(format!("Heights storage error: {}", e)))?; + + // Also update the reverse mapping (blockid -> height) + block_heights_table.insert(blockid, height).map_err(|e| { + BlockProcError::Custom(format!("Block height storage error: {}", e)) + })?; + + // Track UTXOs spent in this block + let mut block_spends = Vec::new(); + + // Track all transactions in this block + let mut block_txs = Vec::new(); + + // Process transactions in the block for tx in block.transactions { + let _ = tx_ctx.process_transaction( + &tx, + blockid, + &mut txno_counter, + &mut block_txs, + &mut block_spends, + )?; + + // Check if transaction ID is in tracking list and notify if needed let txid = tx.txid(); - txno.inc_assign(); - - let mut table = db - .open_table(TABLE_TXIDS) - .map_err(BlockProcError::TxidTable)?; - table - .insert(txid.to_byte_array(), txno) - .map_err(BlockProcError::TxidStorage)?; - - // TODO: Add remaining transaction information to other database tables - - let mut table = db - .open_table(TABLE_TXES) - .map_err(BlockProcError::TxesTable)?; - table - .insert(txno, DbTx::from(tx)) - .map_err(BlockProcError::TxesStorage)?; - - // TODO: If txid match `tracking` Bloom filters, send information to the broker - if false { + let txid_bytes = txid.to_byte_array(); + if self.should_notify_transaction(txid_bytes) { self.broker.send(ImporterMsg::Mined(txid))?; } count += 1; } - let mut main = db - .open_table(TABLE_MAIN) - .map_err(BlockProcError::MainTable)?; - main.insert(REC_TXNO, txno.to_byte_array().as_slice()) - .map_err(BlockProcError::TxNoUpdate)?; + // Finalize the block processing + tx_ctx.finalize_block_processing( + &db, + blockid, + block_txs, + block_spends, + txno_counter, + )?; + + // Log successful block processing + log::debug!( + target: NAME, + "Successfully processed block {} at height {} with {} transactions", + id, + height, + count + ); Ok(()) }; + + match process() { + Ok(()) => { + db.commit()?; + + log::debug!( + target: NAME, + "Successfully processed block {} with {} transactions", + id, + count + ); + + Ok(count) + } + Err(BlockProcError::OrphanBlock(e)) => { + // Handle orphan block case + if let Err(err) = db.abort() { + log::warn!(target: NAME, "Unable to abort failed database transaction due to {err}"); + }; + + // Save the orphan block for later processing + log::info!( + target: NAME, + "Orphan block detected: Parent block {} not found for block {}", + parent_hash, + id + ); + + self.save_orphan_block(id, block_clone)?; + Err(BlockProcError::OrphanBlock(e)) + } + Err(BlockProcError::PotentialFork(new_block_hash, height, existing_blockid)) => { + // Handle potential fork case - conflict with existing block at same height + if let Err(err) = db.abort() { + log::warn!(target: NAME, "Unable to abort failed database transaction due to {err}"); + }; + + // Record this as a potential fork for later verification + // Store the new block but don't update the height tables yet + // We'll only perform a reorganization if this fork becomes the longest chain + let result = self.process_potential_fork( + id, + &block_clone, + Some(height), + Some(existing_blockid), + )?; + + debug_assert!(result.is_none()); + + Err(BlockProcError::PotentialFork(new_block_hash, height, existing_blockid)) + } + Err(BlockProcError::ForkChainExtension(block_hash, parent_hash)) => { + // Handle fork chain extension case - parent block is part of a fork + if let Err(err) = db.abort() { + log::warn!(target: NAME, "Unable to abort failed database transaction due to {err}"); + }; + + log::info!( + target: NAME, + "Processing block {} as fork chain extension with parent {}", + block_hash, + parent_hash + ); + + // If a chain reorganization occurs, return the number of transactions added + if let Some(txs_added) = + self.process_potential_fork(id, &block_clone, None, None)? + { + return Ok(txs_added); + } + + Err(BlockProcError::ForkChainExtension(block_hash, parent_hash)) + } + Err(e) => { + // Handle other errors + if let Err(err) = db.abort() { + log::warn!(target: NAME, "Unable to abort failed database transaction due to {err}"); + }; + Err(e) + } + } + } + + /// Process a block and all its dependent orphans in an iterative manner to avoid stack + /// overflow. + /// + /// This method should be used instead of directly calling `process_block` when you want to + /// ensure that orphan blocks dependent on the processed block are also handled. + pub fn process_block_and_orphans( + &mut self, + id: BlockHash, + block: Block, + ) -> Result { + // Create a queue to store blocks that need to be processed + // Store (block_hash, block, parent_hash) tuples + let mut pending_blocks = std::collections::VecDeque::new(); + pending_blocks.push_back((id, block, None)); + + let mut total_processed = 0; + + // Process blocks in a loop rather than recursive calls + while let Some((current_id, current_block, parent_hash)) = pending_blocks.pop_front() { + // Process the current block + match self.process_block(current_id, current_block) { + Ok(count) => { + total_processed += count; + + // If this was an orphan block (has a parent_hash), remove it from the orphan + // pool + if let Some(parent) = parent_hash { + // Only remove this specific orphan after successful processing + if let Err(e) = self.remove_processed_orphans(parent, &[current_id]) { + log::warn!( + target: NAME, + "Failed to remove processed orphan {}: {}", + current_id, + e + ); + } else { + log::info!( + target: NAME, + "Successfully removed processed orphan {} from pool", + current_id + ); + } + } + + // Find orphans that depend on this block + if let Ok(orphans) = self.find_dependent_orphans(current_id) { + // Skip if no orphans found + if !orphans.is_empty() { + // Add them to the queue for processing + for (orphan_id, orphan_block) in orphans { + log::info!( + target: NAME, + "Adding orphan block {} to processing queue", + orphan_id + ); + + // Add to the queue for processing + // Include the parent hash so we can remove it from orphan pool + // after processing + pending_blocks.push_back(( + orphan_id, + orphan_block, + Some(current_id), + )); + } + } + } + } + Err(e) => { + // For orphan blocks, we just continue with the next block + if let BlockProcError::OrphanBlock(_) = e { + log::debug!( + target: NAME, + "Orphan block {} will be processed later when its parent is available", + current_id + ); + continue; + } + + // For other errors, log and return the error + log::error!( + target: NAME, + "Error processing block {}: {}", + current_id, + e + ); + return Err(e); + } + } + } + + Ok(total_processed) + } + + // Helper method to find orphans that depend on a specific block + fn find_dependent_orphans( + &self, + parent_id: BlockHash, + ) -> Result, BlockProcError> { + // First check if we have any orphans that depend on this block + let (tx, rx) = crossbeam_channel::bounded(1); + self.db.send(DbMsg::Read(tx))?; + let db = rx.recv()?; + + // Check orphan parents table + let orphan_parents_table = db + .open_table(TABLE_ORPHAN_PARENTS) + .map_err(|e| BlockProcError::Custom(format!("Orphan parents table error: {}", e)))?; + + let parent_hash = parent_id.to_byte_array(); + let orphans = orphan_parents_table + .get(parent_hash) + .map_err(|e| BlockProcError::Custom(format!("Orphan parents lookup error: {}", e)))?; + + // If no orphans depend on this block, return empty list + if orphans.is_none() { + return Ok(Vec::new()); + } + + // Get list of orphan block hashes + let orphan_hashes = orphans.unwrap().value(); + + // Get orphans data + let orphans_table = db + .open_table(TABLE_ORPHANS) + .map_err(|e| BlockProcError::Custom(format!("Orphans table error: {}", e)))?; + + let mut dependent_orphans = Vec::with_capacity(orphan_hashes.len()); + + for orphan_hash in &orphan_hashes { + // Get the orphan block data + if let Some(orphan_block_data) = orphans_table + .get(orphan_hash) + .map_err(|e| BlockProcError::Custom(format!("Orphan lookup error: {}", e)))? + { + let (block_data, _timestamp) = orphan_block_data.value(); + + // Extract the Block object and create a BlockHash + let block = Block::from(block_data); + let block_hash = BlockHash::from_byte_array(*orphan_hash); + debug_assert_eq!(block.block_hash(), block_hash); + + dependent_orphans.push((block_hash, block)); + + log::info!( + target: NAME, + "Found orphan block {} with parent {}", + block_hash, + parent_id + ); + } + } + + // We don't remove orphans here - they'll be removed after successful processing + if !dependent_orphans.is_empty() { + log::info!( + target: NAME, + "Found {} orphan blocks dependent on block {}", + dependent_orphans.len(), + parent_id + ); + } + + Ok(dependent_orphans) + } + + // Modified to remove orphans after they've been processed + fn remove_processed_orphans( + &mut self, + parent_id: BlockHash, + processed: &[BlockHash], + ) -> Result<(), BlockProcError> { + if processed.is_empty() { + return Ok(()); + } + + let (tx, rx) = crossbeam_channel::bounded(1); + self.db.send(DbMsg::Write(tx))?; + let write_db = rx.recv()?; + + let remove_orphans = || -> Result<(), BlockProcError> { + // Remove from orphan parents table + let mut orphan_parents_table = + write_db.open_table(TABLE_ORPHAN_PARENTS).map_err(|e| { + BlockProcError::Custom(format!("Orphan parents table error: {}", e)) + })?; + + // Get the current list of orphans for this parent + let parent_hash = parent_id.to_byte_array(); + + // Get orphan list and immediately convert to Vec to drop the borrow + let orphan_hashes = { + let orphans = orphan_parents_table.get(parent_hash).map_err(|e| { + BlockProcError::Custom(format!("Orphan parents lookup error: {}", e)) + })?; + + if let Some(orphans_record) = orphans { + orphans_record.value().to_vec() + } else { + // No orphans found for this parent, nothing to do + return Ok(()); + } + }; + + // Filter out processed orphans + let remaining_orphans: Vec<[u8; 32]> = orphan_hashes + .into_iter() + .filter(|h| !processed.iter().any(|p| p.to_byte_array() == *h)) + .collect(); + + // Update or remove the entry + if remaining_orphans.is_empty() { + orphan_parents_table.remove(parent_hash).map_err(|e| { + BlockProcError::Custom(format!("Orphan parents removal error: {}", e)) + })?; + + log::debug!( + target: NAME, + "Removed all orphans for parent block {}", + parent_id + ); + } else { + orphan_parents_table + .insert(parent_hash, remaining_orphans) + .map_err(|e| BlockProcError::Custom(format!("Parent update error: {}", e)))?; + + log::debug!( + target: NAME, + "Updated orphan list for parent block {}", + parent_id + ); + } + + // Remove from orphans table + let mut orphans_table = write_db + .open_table(TABLE_ORPHANS) + .map_err(|e| BlockProcError::Custom(format!("Orphans table error: {}", e)))?; + + for orphan_hash in processed { + orphans_table + .remove(orphan_hash.to_byte_array()) + .map_err(|e| BlockProcError::Custom(format!("Orphan removal error: {}", e)))?; + + log::debug!( + target: NAME, + "Removed orphan block {} from orphans table", + orphan_hash + ); + } + + Ok(()) + }; + + if let Err(e) = remove_orphans() { + if let Err(err) = write_db.abort() { + log::warn!( + target: NAME, + "Unable to abort failed orphan cleanup transaction due to {err}" + ); + } + return Err(e); + } + + write_db.commit()?; + + log::info!( + target: NAME, + "Successfully removed {} processed orphan blocks", + processed.len() + ); + + Ok(()) + } + + // Save an orphan block for later processing + fn save_orphan_block(&self, id: BlockHash, block: Block) -> Result { + log::info!( + target: NAME, + "Saving orphan block {} with parent {} for later processing", + id, + block.header.prev_block_hash + ); + + // First, check if we should clean up old orphans + self.clean_expired_orphans()?; + + // Then check if we have too many orphans + if self.count_orphans()? >= MAX_ORPHAN_BLOCKS { + log::warn!( + target: NAME, + "Orphan block limit reached ({}). Rejecting new orphan block {}", + MAX_ORPHAN_BLOCKS, + id + ); + // Simply ignore this orphan block + return Ok(0); + } + + let (tx, rx) = crossbeam_channel::bounded(1); + self.db.send(DbMsg::Write(tx))?; + let db = rx.recv()?; + + let process = || -> Result<(), BlockProcError> { + // Get the current timestamp for expiry tracking + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(); + + let parent_hash = block.header.prev_block_hash.to_byte_array(); + + // Store the orphan block + let mut orphans_table = db + .open_table(TABLE_ORPHANS) + .map_err(|e| BlockProcError::Custom(format!("Orphans table error: {}", e)))?; + + orphans_table + .insert(id.to_byte_array(), (DbBlock::from(block), now)) + .map_err(|e| BlockProcError::Custom(format!("Orphan storage error: {}", e)))?; + + // Index by parent hash to allow quick lookup when parent is processed + let mut orphan_parents_table = db.open_table(TABLE_ORPHAN_PARENTS).map_err(|e| { + BlockProcError::Custom(format!("Orphan parents table error: {}", e)) + })?; + + // Get existing orphans with the same parent, if any + let mut orphan_list = orphan_parents_table + .get(parent_hash) + .map_err(|e| BlockProcError::Custom(format!("Orphan parents lookup error: {}", e)))? + .map(|v| v.value().to_vec()) + .unwrap_or_default(); + + // Add this orphan to the list + orphan_list.push(id.to_byte_array()); + + // Update the orphan parents table + orphan_parents_table + .insert(parent_hash, orphan_list) + .map_err(|e| { + BlockProcError::Custom(format!("Orphan parents update error: {}", e)) + })?; + + Ok(()) + }; + if let Err(e) = process() { if let Err(err) = db.abort() { - log::warn!(target: NAME, "Unable to abort failed database transaction due to {err}"); + log::warn!( + target: NAME, + "Unable to abort failed orphan block storage transaction due to {err}" + ); }; return Err(e); } + db.commit()?; + log::info!( + target: NAME, + "Successfully saved orphan block {} for later processing", + id + ); + + // Return 0 since we didn't process any transactions yet + Ok(0) + } + + // Count total number of orphan blocks + fn count_orphans(&self) -> Result { + let (tx, rx) = crossbeam_channel::bounded(1); + self.db.send(DbMsg::Read(tx))?; + let db = rx.recv()?; + + let orphans_table = db + .open_table(TABLE_ORPHANS) + .map_err(|e| BlockProcError::Custom(format!("Orphans table error: {}", e)))?; + + let count: usize = orphans_table + .len() + .map_err(|e| BlockProcError::Custom(format!("Failed to count orphans: {}", e)))? + as usize; + Ok(count) } + + // Remove orphan blocks that have been in the pool for too long + fn clean_expired_orphans(&self) -> Result<(), BlockProcError> { + log::debug!(target: NAME, "Checking for expired orphan blocks..."); + + let (tx, rx) = crossbeam_channel::bounded(1); + self.db.send(DbMsg::Read(tx))?; + let db = rx.recv()?; + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(); + + // Calculate expiry threshold + let expiry_secs = ORPHAN_EXPIRY_HOURS * 3600; + let expiry_threshold = now.saturating_sub(expiry_secs); + + // Find expired orphans + let orphans_table = db + .open_table(TABLE_ORPHANS) + .map_err(|e| BlockProcError::Custom(format!("Orphans table error: {}", e)))?; + + let mut expired_orphans = Vec::new(); + + // Scan all orphans + let orphans_iter = orphans_table.iter().map_err(|e| { + BlockProcError::Custom(format!("Failed to iterate orphans table: {}", e)) + })?; + + for orphan_entry in orphans_iter { + let (orphan_hash, data) = orphan_entry.map_err(|e| { + BlockProcError::Custom(format!("Failed to read orphan entry: {}", e)) + })?; + + let (_block_data, timestamp) = data.value(); + + // Check if orphan has expired + if timestamp < expiry_threshold { + expired_orphans.push(orphan_hash.value()); + } + } + + if !expired_orphans.is_empty() { + log::info!( + target: NAME, + "Found {} expired orphan blocks to clean up", + expired_orphans.len() + ); + + let (write_tx, write_rx) = crossbeam_channel::bounded(1); + self.db.send(DbMsg::Write(write_tx))?; + let write_db = write_rx.recv()?; + + let remove_expired = || -> Result<(), BlockProcError> { + let mut orphans_table = write_db + .open_table(TABLE_ORPHANS) + .map_err(|e| BlockProcError::Custom(format!("Orphans table error: {}", e)))?; + + let mut orphan_parents_table = + write_db.open_table(TABLE_ORPHAN_PARENTS).map_err(|e| { + BlockProcError::Custom(format!("Orphan parents table error: {}", e)) + })?; + + for orphan_hash in &expired_orphans { + // Remove from orphans table + orphans_table.remove(orphan_hash).map_err(|e| { + BlockProcError::Custom(format!("Orphan removal error: {}", e)) + })?; + + // Also need to remove from parent mappings + // This is more complex as we need to scan all parent entries + let parents_iter = orphan_parents_table.iter().map_err(|e| { + BlockProcError::Custom(format!("Failed to iterate orphan parents: {}", e)) + })?; + + // First collect all parents to scan + let mut parents_to_scan = Vec::new(); + + for parent_entry in parents_iter { + let (parent_hash, orphans) = parent_entry.map_err(|e| { + BlockProcError::Custom(format!("Failed to read parent entry: {}", e)) + })?; + + // Store parent data for later processing + parents_to_scan.push((parent_hash.value(), orphans.value().to_vec())); + } + + // Now process parents without borrowing the table + for (parent_hash, orphans_list) in parents_to_scan { + // We need to iterate each orphan hash in the list and check if it + // matches our target + let mut found = false; + for list_hash in &orphans_list { + // Convert both to slices for comparison + if list_hash == orphan_hash { + found = true; + break; + } + } + + if found { + // Remove this orphan from the list + let updated_list: Vec<[u8; 32]> = orphans_list + .into_iter() + .filter(|h| h != orphan_hash) + .collect(); + + if updated_list.is_empty() { + // If no orphans left for this parent, remove the entry + orphan_parents_table.remove(parent_hash).map_err(|e| { + BlockProcError::Custom(format!("Parent removal error: {}", e)) + })?; + } else { + // Otherwise update with the filtered list + orphan_parents_table + .insert(parent_hash, updated_list) + .map_err(|e| { + BlockProcError::Custom(format!( + "Parent update error: {}", + e + )) + })?; + } + } + } + } + + Ok(()) + }; + + if let Err(e) = remove_expired() { + if let Err(err) = write_db.abort() { + log::warn!( + target: NAME, + "Unable to abort failed orphan cleanup transaction due to {err}" + ); + } + return Err(e); + } + + write_db.commit()?; + + log::info!( + target: NAME, + "Successfully removed {} expired orphan blocks", + expired_orphans.len() + ); + } + + Ok(()) + } + + /// Process a block that might create a fork in the blockchain. + /// This method records fork information and checks if we need to perform a chain + /// reorganization. + fn process_potential_fork( + &mut self, + block_hash: BlockHash, + block: &Block, + height: Option, + existing_blockid: Option, + ) -> Result, BlockProcError> { + let (tx, rx) = crossbeam_channel::bounded(1); + self.db.send(DbMsg::Write(tx))?; + let db = rx.recv()?; + + // Get a new block ID for this fork block + let new_blockid = self.get_next_block_id(&db)?; + + { + // Store the block header + let mut blocks_table = db + .open_table(TABLE_BLKS) + .map_err(BlockProcError::BlockTable)?; + blocks_table + .insert(new_blockid, DbBlockHeader::from(block.header)) + .map_err(BlockProcError::BlockStorage)?; + + // Store the complete block data in the fork blocks table + let mut fork_blocks_table = db + .open_table(TABLE_FORK_BLOCKS) + .map_err(|e| BlockProcError::Custom(format!("Fork blocks table error: {}", e)))?; + fork_blocks_table + .insert(new_blockid, DbBlock::from(block.clone())) + .map_err(|e| BlockProcError::Custom(format!("Fork block storage error: {}", e)))?; + + // Map block hash to the assigned block ID + let mut blockids_table = db + .open_table(TABLE_BLOCKIDS) + .map_err(|e| BlockProcError::Custom(format!("Block IDs table error: {}", e)))?; + blockids_table + .insert(block_hash.to_byte_array(), new_blockid) + .map_err(|e| BlockProcError::Custom(format!("Block ID storage error: {}", e)))?; + } + + // First step: Check if this block extends an existing fork + // Find the parent block ID + let parent_block_id = self + .find_block_id_by_hash(&db, block.header.prev_block_hash)? + .ok_or(BlockProcError::Custom(format!( + "Parent block not found: {}", + block.header.prev_block_hash + )))?; + let fork_id = if let Some(parent_fork_id) = + self.find_fork_by_block_id(&db, parent_block_id)? + { + // This block extends an existing fork + log::info!( + target: NAME, + "Block {} extends existing fork {}", + block_hash, + parent_fork_id + ); + + // Update the fork with this new block + self.update_fork(&db, parent_fork_id, new_blockid)?; + + parent_fork_id + } else { + // This block might start a new fork + // First check if its parent is in the main chain + if !self.is_block_in_main_chain(&db, block.header.prev_block_hash)? { + // Parent block is not in main chain and not in a known fork + log::warn!( + target: NAME, + "Block {} is disconnected: parent {} not found in main chain or forks", + block_hash, + block.header.prev_block_hash + ); + return Ok(None); + } + + self.record_fork( + &db, + height + .ok_or(BlockProcError::Custom("Height is required for new fork".to_string()))?, + existing_blockid.ok_or(BlockProcError::Custom( + "Existing block ID is required for new fork".to_string(), + ))?, + new_blockid, + block_hash, + )? + }; + + // Check if this fork is now longer than the main chain + let txs_added = self.check_fork_length(&db, fork_id)?; + + db.commit()?; + + Ok(txs_added) + } + + /// Check if a fork is longer than the main chain and perform reorganization if needed + fn check_fork_length( + &mut self, + db: &WriteTransaction, + fork_id: ForkId, + ) -> Result, BlockProcError> { + // Get fork information + let (_fork_start_height, _fork_start_block_id, _fork_tip_id, fork_height) = + self.get_fork_info(db, fork_id)?; + + // Get main chain height + let main_chain_height = self.get_main_chain_height(db)?; + + // If fork is longer than main chain, perform reorganization + if fork_height > main_chain_height { + log::info!( + target: NAME, + "Fork {} is longer than main chain ({} > {}), initiating chain reorganization", + fork_id, + fork_height, + main_chain_height + ); + + // Perform chain reorganization + let txs_added = self.perform_chain_reorganization(db, fork_id)?; + return Ok(Some(txs_added)); + } else { + log::debug!( + target: NAME, + "Fork {} is not longer than main chain ({} <= {}), no reorganization needed", + fork_id, + fork_height, + main_chain_height + ); + } + + Ok(None) + } + + /// Perform a chain reorganization to adopt a fork as the new main chain + fn perform_chain_reorganization( + &mut self, + db: &WriteTransaction, + fork_id: ForkId, + ) -> Result { + // Get fork information + let (fork_start_height, _fork_start_block_id, fork_tip_id, fork_height) = + self.get_fork_info(db, fork_id)?; + + log::info!( + target: NAME, + "Starting chain reorganization: Fork {} from height {} to {} with tip block {}", + fork_id, + fork_start_height, + fork_height, + fork_tip_id + ); + + // 1. Find the common ancestor + let common_ancestor_height = fork_start_height; + + // 2. Get blocks to rollback from main chain + let main_chain_height = self.get_main_chain_height(db)?; + let blocks_to_rollback = + self.get_blocks_to_rollback(db, common_ancestor_height, main_chain_height)?; + + // 3. Get blocks to apply from fork chain + let blocks_to_apply = + self.get_blocks_to_apply(db, fork_id, common_ancestor_height, fork_height)?; + + log::info!( + target: NAME, + "Chain reorganization: rolling back {} blocks and applying {} blocks", + blocks_to_rollback.len(), + blocks_to_apply.len() + ); + + // 4. Roll back blocks from main chain + self.rollback_blocks(db, &blocks_to_rollback)?; + + // 5. Apply blocks from fork chain + let txs_added = self.apply_blocks(db, &blocks_to_apply)?; + + // 6. Update fork status + self.cleanup_after_reorg(db, fork_id)?; + + log::info!( + target: NAME, + "Chain reorganization complete: new chain height is {}", + fork_height + ); + + Ok(txs_added) + } + + /// Records a potential fork in the blockchain. + /// This happens when we discover two different blocks at the same height. + fn record_fork( + &self, + db: &WriteTransaction, + height: u32, + existing_blockid: BlockId, + new_blockid: BlockId, + new_block_hash: BlockHash, + ) -> Result { + // Check if this block is already part of a known fork + if let Some(fork_id) = self.find_fork_by_block_id(db, new_blockid)? { + log::debug!( + target: NAME, + "Block {} at height {} is already part of fork {}", + new_block_hash, + height, + fork_id + ); + return Ok(fork_id); + } + + // Generate a new fork ID + let fork_id = self.get_next_fork_id(db)?; + + // Record the fork in the forks table + let mut forks_table = db + .open_table(TABLE_FORKS) + .map_err(|e| BlockProcError::Custom(format!("Forks table error: {}", e)))?; + + // A fork starts at the current height with the current block + // Parameters: (fork_start_height, fork_start_block_id, tip_block_id, current_height) + forks_table + .insert(fork_id, (height, existing_blockid, new_blockid, height)) + .map_err(|e| BlockProcError::Custom(format!("Fork insertion error: {}", e)))?; + + // Map the fork tip block ID to the fork ID + let mut fork_tips_table = db + .open_table(TABLE_FORK_TIPS) + .map_err(|e| BlockProcError::Custom(format!("Fork tips table error: {}", e)))?; + + fork_tips_table + .insert(new_blockid, fork_id) + .map_err(|e| BlockProcError::Custom(format!("Fork tip mapping error: {}", e)))?; + + log::info!( + target: NAME, + "Created new fork {} at height {}: Main chain block {} vs Fork block {}", + fork_id, + height, + existing_blockid, + new_blockid + ); + + Ok(fork_id) + } + + /// Gets the next available block ID and increments the counter + fn get_next_block_id(&self, db: &WriteTransaction) -> Result { + let mut main = db + .open_table(TABLE_MAIN) + .map_err(BlockProcError::MainTable)?; + let mut block_id = match main + .get(REC_BLOCKID) + .map_err(|e| BlockProcError::Custom(format!("Block ID lookup error: {}", e)))? + { + Some(rec) => BlockId::from_bytes(rec.value()), + None => BlockId::start(), + }; + + block_id.inc_assign(); + main.insert(REC_BLOCKID, block_id.to_bytes().as_slice()) + .map_err(|e| BlockProcError::Custom(format!("Block ID update error: {}", e)))?; + + Ok(block_id) + } + + /// Gets the next available fork ID and increments the counter + fn get_next_fork_id(&self, db: &WriteTransaction) -> Result { + let mut main = db + .open_table(TABLE_MAIN) + .map_err(BlockProcError::MainTable)?; + + let mut fork_id = { + match main + .get(REC_FORK_ID) + .map_err(|e| BlockProcError::Custom(format!("Fork ID lookup error: {}", e)))? + { + Some(rec) => ForkId::from_bytes(rec.value()), + None => ForkId::start(), + } + }; + fork_id.inc_assign(); + main.insert(REC_FORK_ID, fork_id.to_bytes().as_slice()) + .map_err(|e| BlockProcError::Custom(format!("Fork ID update error: {}", e)))?; + + Ok(fork_id) + } + + /// Find block ID by block hash + fn find_block_id_by_hash( + &self, + db: &WriteTransaction, + block_hash: BlockHash, + ) -> Result, BlockProcError> { + let blockids_table = db + .open_table(TABLE_BLOCKIDS) + .map_err(BlockProcError::BlockTable)?; + let block_id = blockids_table + .get(block_hash.to_byte_array()) + .map_err(|e| BlockProcError::Custom(format!("Block ID lookup error: {}", e)))?; + if let Some(record) = block_id { Ok(Some(record.value())) } else { Ok(None) } + } + + /// Find fork ID by block hash + fn find_fork_by_block_id( + &self, + db: &WriteTransaction, + block_id: BlockId, + ) -> Result, BlockProcError> { + let fork_tips_table = db + .open_table(TABLE_FORK_TIPS) + .map_err(|e| BlockProcError::Custom(format!("Fork tips table error: {}", e)))?; + + if let Some(fork_id_record) = fork_tips_table + .get(block_id) + .map_err(|e| BlockProcError::Custom(format!("Fork tip lookup error: {}", e)))? + { + return Ok(Some(fork_id_record.value())); + } + + Ok(None) + } + + /// Update fork information with a new block + fn update_fork( + &self, + db: &WriteTransaction, + fork_id: ForkId, + new_block_id: BlockId, + ) -> Result<(), BlockProcError> { + // Get current fork info + let (start_height, start_block_id, old_tip_id, current_height) = + self.get_fork_info(db, fork_id)?; + let new_height = current_height + 1; + + // Update the fork record + let mut forks_table = db + .open_table(TABLE_FORKS) + .map_err(|e| BlockProcError::Custom(format!("Forks table error: {}", e)))?; + + // Update fork with new tip and height + forks_table + .insert(fork_id, (start_height, start_block_id, new_block_id, new_height)) + .map_err(|e| BlockProcError::Custom(format!("Fork update error: {}", e)))?; + + // Update the fork tip mapping + let mut fork_tips_table = db + .open_table(TABLE_FORK_TIPS) + .map_err(|e| BlockProcError::Custom(format!("Fork tips table error: {}", e)))?; + + // Remove old tip mapping if it exists + fork_tips_table + .remove(old_tip_id) + .map_err(|e| BlockProcError::Custom(format!("Fork tip removal error: {}", e)))?; + + // Add new tip mapping + fork_tips_table + .insert(new_block_id, fork_id) + .map_err(|e| BlockProcError::Custom(format!("Fork tip mapping error: {}", e)))?; + + log::debug!( + target: NAME, + "Updated fork {}: new height {}, new tip {}", + fork_id, + new_height, + new_block_id + ); + + Ok(()) + } + + /// Get the current height of the main chain + fn get_main_chain_height(&self, db: &WriteTransaction) -> Result { + // Find the maximum height in the heights table + let heights_table = db + .open_table(TABLE_HEIGHTS) + .map_err(|e| BlockProcError::Custom(format!("Heights table error: {}", e)))?; + + let mut max_height = 0; + let iter = heights_table + .iter() + .map_err(|e| BlockProcError::Custom(format!("Heights iterator error: {}", e)))?; + + for entry in iter { + let (height, _) = + entry.map_err(|e| BlockProcError::Custom(format!("Heights entry error: {}", e)))?; + + let h = height.value(); + if h > max_height { + max_height = h; + } + } + + Ok(max_height) + } + + /// Check if a block with the given hash is in the main chain + fn is_block_in_main_chain( + &self, + db: &WriteTransaction, + block_hash: BlockHash, + ) -> Result { + // Look up the block ID + let blockids_table = db + .open_table(TABLE_BLOCKIDS) + .map_err(|e| BlockProcError::Custom(format!("Block IDs table error: {}", e)))?; + + let block_id = match blockids_table + .get(block_hash.to_byte_array()) + .map_err(|e| BlockProcError::Custom(format!("Block ID lookup error: {}", e)))? + { + Some(id_record) => id_record.value(), + None => return Ok(false), // Block not found + }; + + // Check if this block ID has a height entry + let block_heights_table = db + .open_table(TABLE_BLOCK_HEIGHTS) + .map_err(|e| BlockProcError::Custom(format!("Block heights table error: {}", e)))?; + + if block_heights_table + .get(block_id) + .map_err(|e| BlockProcError::Custom(format!("Block height lookup error: {}", e)))? + .is_some() + { + return Ok(true); // Block has a height, so it's in the main chain + } + + Ok(false) + } + + /// Get blocks that need to be rolled back from the main chain + /// Returns a list of (height, block_id) pairs, from highest to lowest height + fn get_blocks_to_rollback( + &self, + db: &WriteTransaction, + start_height: u32, + end_height: u32, + ) -> Result, BlockProcError> { + let mut blocks_to_rollback = Vec::new(); + + let heights_table = db + .open_table(TABLE_HEIGHTS) + .map_err(|e| BlockProcError::Custom(format!("Heights table error: {}", e)))?; + + // We need to roll back from highest to lowest height + for height in (start_height..=end_height).rev() { + if let Some(block_id_record) = heights_table + .get(height) + .map_err(|e| BlockProcError::Custom(format!("Heights lookup error: {}", e)))? + { + blocks_to_rollback.push((height, block_id_record.value())); + } + } + + log::debug!( + target: NAME, + "Found {} blocks to roll back from heights {} to {}", + blocks_to_rollback.len(), + start_height, + end_height + ); + + Ok(blocks_to_rollback) + } + + /// Get blocks that need to be applied from the fork chain + /// Returns a list of (height, block_id) pairs, from lowest to highest height + fn get_blocks_to_apply( + &self, + db: &WriteTransaction, + fork_id: ForkId, + start_height: u32, + end_height: u32, + ) -> Result, BlockProcError> { + // Find the blocks in the fork that need to be applied + // This is more complex as fork blocks aren't in the heights table yet + + // Get the tip block ID of the fork + let (_fork_start_height, _fork_start_block_id, fork_tip_id, fork_height) = + self.get_fork_info(db, fork_id)?; + + // We need to find all blocks from the tip down to the start height + // Since they're not yet in the heights table, we need to traverse backwards + + // Start with the tip block + let mut current_height = fork_height; + let mut current_block_id = fork_tip_id; + + // Collect blocks (from high to low) + let mut temp_blocks = Vec::new(); + + let blks_table = db + .open_table(TABLE_BLKS) + .map_err(|e| BlockProcError::Custom(format!("Blocks table error: {}", e)))?; + + let blockids_table = db + .open_table(TABLE_BLOCKIDS) + .map_err(|e| BlockProcError::Custom(format!("Block IDs table error: {}", e)))?; + + while current_height >= start_height { + temp_blocks.push((current_height, current_block_id)); + + if current_height == start_height { + break; + } + + // Find the parent of this block + let block_header = match blks_table + .get(current_block_id) + .map_err(|e| BlockProcError::Custom(format!("Block lookup error: {}", e)))? + { + Some(record) => record.value(), + None => { + return Err(BlockProcError::Custom(format!( + "Block with ID {} not found in database", + current_block_id + ))); + } + }; + + let prev_hash = block_header.as_ref().prev_block_hash; + + // Find the block ID for this hash + let prev_block_id = match blockids_table + .get(prev_hash.to_byte_array()) + .map_err(|e| BlockProcError::Custom(format!("Block ID lookup error: {}", e)))? + { + Some(record) => record.value(), + None => { + return Err(BlockProcError::Custom(format!( + "Previous block with hash {} not found in database", + prev_hash + ))); + } + }; + + current_block_id = prev_block_id; + current_height -= 1; + } + + // Reverse to get blocks from low to high + let blocks_to_apply: Vec<(u32, crate::db::Id)> = temp_blocks.into_iter().rev().collect(); + + log::debug!( + target: NAME, + "Found {} blocks to apply from heights {} to {}", + blocks_to_apply.len(), + start_height, + end_height + ); + + Ok(blocks_to_apply) + } + + /// Roll back blocks from the main chain + fn rollback_blocks( + &self, + db: &WriteTransaction, + blocks: &[(u32, BlockId)], + ) -> Result<(), BlockProcError> { + if blocks.is_empty() { + return Ok(()); + } + + let mut total_txs_removed = 0; + let mut total_utxos_restored = 0; + let mut total_utxos_removed = 0; + let mut total_spk_entries_cleaned = 0; + let mut total_inputs_cleaned = 0; + let mut total_outs_refs_cleaned = 0; + + let block_spends_table = db + .open_table(TABLE_BLOCK_SPENDS) + .map_err(|e| BlockProcError::Custom(format!("Block spends table error: {}", e)))?; + + let mut utxos_table = db + .open_table(TABLE_UTXOS) + .map_err(|e| BlockProcError::Custom(format!("UTXOs table error: {}", e)))?; + + let block_txs_table = db + .open_table(TABLE_BLOCK_TXS) + .map_err(|e| BlockProcError::Custom(format!("Block-txs table error: {}", e)))?; + + let txes_table = db + .open_table(TABLE_TXES) + .map_err(|e| BlockProcError::Custom(format!("Txes table error: {}", e)))?; + + let mut heights_table = db + .open_table(TABLE_HEIGHTS) + .map_err(|e| BlockProcError::Custom(format!("Heights table error: {}", e)))?; + + let mut block_heights_table = db + .open_table(TABLE_BLOCK_HEIGHTS) + .map_err(|e| BlockProcError::Custom(format!("Block heights table error: {}", e)))?; + + // Open the SPKs table for script pubkey cleanup during rollback + let mut spks_table = db + .open_table(TABLE_SPKS) + .map_err(|e| BlockProcError::Custom(format!("SPKs table error: {}", e)))?; + + // Open the inputs table to clean up input references + let mut inputs_table = db + .open_table(TABLE_INPUTS) + .map_err(|e| BlockProcError::Custom(format!("Inputs table error: {}", e)))?; + + // Open the outs table to clean up spending relationships + let mut outs_table = db + .open_table(TABLE_OUTS) + .map_err(|e| BlockProcError::Custom(format!("Outs table error: {}", e)))?; + + // Open tx_blocks table to clean up transaction-block associations + let mut tx_blocks_table = db + .open_table(TABLE_TX_BLOCKS) + .map_err(|e| BlockProcError::Custom(format!("TX-Blocks table error: {}", e)))?; + + // Iterate through blocks to roll back (should be in descending height order) + for &(height, block_id) in blocks { + log::info!( + target: NAME, + "Rolling back block at height {}: block ID {}", + height, + block_id + ); + + let mut block_utxos_restored = 0; + let mut block_utxos_removed = 0; + let mut block_txs_removed = 0; + let mut block_spk_entries_cleaned = 0; + let mut block_inputs_cleaned = 0; + let mut block_outs_refs_cleaned = 0; + + // 1. Restore UTXOs spent in this block + if let Some(spends_record) = block_spends_table + .get(block_id) + .map_err(|e| BlockProcError::Custom(format!("Block spends lookup error: {}", e)))? + { + let spends = spends_record.value(); + block_utxos_restored = spends.len(); + total_utxos_restored += block_utxos_restored; + + // Restore each spent UTXO + for (txno, vout) in spends { + utxos_table.insert((txno, vout), ()).map_err(|e| { + BlockProcError::Custom(format!("UTXO restoration error: {}", e)) + })?; + + log::debug!( + target: NAME, + "Restored UTXO: txno={}, vout={}", + txno, + vout + ); + } + } + + // 2. Find all transactions in this block + if let Some(txs_record) = block_txs_table + .get(block_id) + .map_err(|e| BlockProcError::Custom(format!("Block-txs lookup error: {}", e)))? + { + let txs = txs_record.value(); + block_txs_removed = txs.len(); + total_txs_removed += block_txs_removed; + + // For each transaction + for txno in txs { + // 3. Remove UTXOs created by this transaction + if let Some(tx_record) = txes_table + .get(txno) + .map_err(|e| BlockProcError::Custom(format!("Tx lookup error: {}", e)))? + { + let tx = tx_record.value(); + let outputs = tx.as_ref().outputs.as_slice(); + let num_outputs = outputs.len(); + block_utxos_removed += num_outputs; + total_utxos_removed += num_outputs; + + // Get the number of inputs for this transaction + let inputs_count = tx.as_ref().inputs.len(); + + for (vout_idx, output) in outputs.iter().enumerate() { + // Remove UTXOs + utxos_table.remove(&(txno, vout_idx as u32)).map_err(|e| { + BlockProcError::Custom(format!("UTXO removal error: {}", e)) + })?; + + log::debug!( + target: NAME, + "Removed UTXO: txno={}, vout={}", + txno, + vout_idx + ); + + // 4. Clean up script pubkey index for this transaction output + let script = &output.script_pubkey; + if !script.is_empty() { + let txnos = spks_table + .get(script.as_slice()) + .map_err(|e| { + BlockProcError::Custom(format!("SPKs lookup error: {}", e)) + })? + .map(|t| t.value().to_vec()); + + if let Some(mut txnos) = txnos { + // Remove this transaction from the list + if let Some(pos) = txnos.iter().position(|&t| t == txno) { + txnos.remove(pos); + block_spk_entries_cleaned += 1; + + // If the list is not empty, update it; otherwise, remove + // the entry + if !txnos.is_empty() { + spks_table.insert(script.as_slice(), txnos).map_err( + |e| { + BlockProcError::Custom(format!( + "SPKs update error: {}", + e + )) + }, + )?; + } else { + spks_table.remove(script.as_slice()).map_err(|e| { + BlockProcError::Custom(format!( + "SPKs removal error: {}", + e + )) + })?; + } + + log::debug!( + target: NAME, + "Cleaned up SPK index for txno={}, vout={}", + txno, + vout_idx + ); + } + } + } + } + + // 5. Clean up inputs table for this transaction + for input_idx in 0..inputs_count { + if inputs_table + .remove(&(txno, input_idx as u32)) + .map_err(|e| { + BlockProcError::Custom(format!("Inputs removal error: {}", e)) + })? + .is_some() + { + block_inputs_cleaned += 1; + log::debug!( + target: NAME, + "Removed input reference: txno={}, input_idx={}", + txno, + input_idx + ); + } + } + total_inputs_cleaned += block_inputs_cleaned; + + // 6. Clean up this transaction from spending relationships + if outs_table + .remove(txno) + .map_err(|e| { + BlockProcError::Custom(format!("Outs lookup error: {}", e)) + })? + .is_some() + { + block_outs_refs_cleaned += 1; + log::debug!( + target: NAME, + "Removed spending relationship for txno={}", + txno + ); + } + } + + // 7. Remove transaction-block association + tx_blocks_table.remove(txno).map_err(|e| { + BlockProcError::Custom(format!("TX-Blocks removal error: {}", e)) + })?; + } + } + + // 8. Remove this block from the heights tables + heights_table + .remove(height) + .map_err(|e| BlockProcError::Custom(format!("Heights removal error: {}", e)))?; + + block_heights_table.remove(block_id).map_err(|e| { + BlockProcError::Custom(format!("Block height removal error: {}", e)) + })?; + + log::debug!( + target: NAME, + "Removed block height mapping for height {} and block ID {}", + height, + block_id + ); + + total_spk_entries_cleaned += block_spk_entries_cleaned; + total_outs_refs_cleaned += block_outs_refs_cleaned; + + log::info!( + target: NAME, + "Block rollback stats for height {}: removed {} transactions, restored {} UTXOs, removed {} UTXOs, cleaned {} SPK entries, {} input refs, {} output refs", + height, + block_txs_removed, + block_utxos_restored, + block_utxos_removed, + block_spk_entries_cleaned, + block_inputs_cleaned, + block_outs_refs_cleaned + ); + } + + log::info!( + target: NAME, + "Successfully rolled back {} blocks: removed {} transactions, restored {} UTXOs, removed {} UTXOs, cleaned {} SPK entries, {} input refs, {} output refs", + blocks.len(), + total_txs_removed, + total_utxos_restored, + total_utxos_removed, + total_spk_entries_cleaned, + total_inputs_cleaned, + total_outs_refs_cleaned + ); + + Ok(()) + } + + /// Apply blocks from the fork chain to make it the new main chain + /// This method processes all transactions in the fork blocks to ensure + /// the UTXO set and other indexes are properly updated + fn apply_blocks( + &self, + db: &WriteTransaction, + blocks: &[(u32, BlockId)], + ) -> Result { + if blocks.is_empty() { + return Ok(0); + } + + // Get current transaction number - we'll need this for processing new transactions + let mut txno = { + let main = db + .open_table(TABLE_MAIN) + .map_err(BlockProcError::MainTable)?; + // Get current transaction number or use starting value if not found + match main.get(REC_TXNO).map_err(BlockProcError::TxNoAbsent)? { + Some(rec) => TxNo::from_slice(rec.value()).map_err(BlockProcError::TxNoInvalid)?, + None => TxNo::start(), + } + }; + + let mut total_txs_added = 0; + let mut total_utxos_added = 0; + let mut total_utxos_spent = 0; + + let mut tx_ctx = TxTablesContext::new(db)?; + + let fork_blocks_table = db + .open_table(TABLE_FORK_BLOCKS) + .map_err(|e| BlockProcError::Custom(format!("Fork blocks table error: {}", e)))?; + + let mut heights_table = db + .open_table(TABLE_HEIGHTS) + .map_err(|e| BlockProcError::Custom(format!("Heights table error: {}", e)))?; + + let mut block_heights_table = db + .open_table(TABLE_BLOCK_HEIGHTS) + .map_err(|e| BlockProcError::Custom(format!("Block heights table error: {}", e)))?; + + // Iterate through blocks to apply (should be in ascending height order) + for &(height, block_id) in blocks { + log::info!( + target: NAME, + "Applying block at height {}: block ID {}", + height, + block_id + ); + + // Get the complete block data from fork blocks table + let block_data = fork_blocks_table + .get(block_id) + .map_err(|e| BlockProcError::Custom(format!("Fork block lookup error: {}", e)))? + .ok_or_else(|| { + BlockProcError::Custom(format!("Fork block {} not found in database", block_id)) + })? + .value(); + + let block = block_data.as_ref(); + log::debug!( + target: NAME, + "Processing {} transactions from fork block {}", + block.transactions.len(), + block_id + ); + + let mut block_txs_added: usize = 0; + let mut block_utxos_added: usize = 0; + let mut block_utxos_spent: usize = 0; + + // Track UTXOs spent in this block + let mut block_spends = Vec::new(); + + // Track all transactions in this block + let mut block_txs = Vec::new(); + + // Process all transactions in the block + for tx in &block.transactions { + let (_, is_new) = tx_ctx.process_transaction( + tx, + block_id, + &mut txno, + &mut block_txs, + &mut block_spends, + )?; + + // Check if transaction ID is in tracking list and notify if needed + let txid = tx.txid(); + let txid_bytes = txid.to_byte_array(); + if self.should_notify_transaction(txid_bytes) { + self.broker.send(ImporterMsg::Mined(txid))?; + } + + if is_new { + block_txs_added += 1; + } + + // Count UTXOs added (outputs) + block_utxos_added += tx.outputs.len(); + + // Count UTXOs spent (inputs except coinbase) + for input in &tx.inputs { + if !input.prev_output.is_coinbase() { + block_utxos_spent += 1; + } + } + } + + // Finalize the block processing + tx_ctx.finalize_block_processing(db, block_id, block_txs, block_spends, txno)?; + + // Update the heights tables + heights_table + .insert(height, block_id) + .map_err(|e| BlockProcError::Custom(format!("Heights storage error: {}", e)))?; + + block_heights_table.insert(block_id, height).map_err(|e| { + BlockProcError::Custom(format!("Block height storage error: {}", e)) + })?; + + log::debug!( + target: NAME, + "Updated block height mapping for height {} and block ID {}", + height, + block_id + ); + + total_txs_added += block_txs_added; + total_utxos_added += block_utxos_added; + total_utxos_spent += block_utxos_spent; + + log::info!( + target: NAME, + "Block apply stats for height {}: added {} transactions, added {} UTXOs, spent {} UTXOs", + height, + block_txs_added, + block_utxos_added, + block_utxos_spent + ); + } + + log::info!( + target: NAME, + "Successfully applied {} blocks: added {} transactions, added {} UTXOs, spent {} UTXOs", + blocks.len(), + total_txs_added, + total_utxos_added, + total_utxos_spent + ); + + Ok(total_txs_added) + } + + /// Clean up fork information after a successful reorganization + fn cleanup_after_reorg( + &self, + db: &WriteTransaction, + applied_fork_id: ForkId, + ) -> Result<(), BlockProcError> { + // Get information about the applied fork + let (_start_height, _start_block_id, _tip_id, fork_height) = + match self.get_fork_info(db, applied_fork_id) { + Ok(info) => info, + Err(BlockProcError::Custom(msg)) if msg.contains("not found") => { + // Fork already removed, nothing to do + return Ok(()); + } + Err(e) => return Err(e), + }; + + // Remove old forks that are now definitely invalid + // Any fork that starts at a height less than the applied fork's height + // and has not become the main chain by now should be removed + let mut forks_table = db + .open_table(TABLE_FORKS) + .map_err(|e| BlockProcError::Custom(format!("Forks table error: {}", e)))?; + + let mut forks_to_remove = Vec::new(); + + let iter = forks_table + .iter() + .map_err(|e| BlockProcError::Custom(format!("Forks iterator error: {}", e)))?; + + for entry in iter { + let (fork_id, info) = + entry.map_err(|e| BlockProcError::Custom(format!("Fork entry error: {}", e)))?; + + let fork_id_value = fork_id.value(); + + // Skip the fork that was just applied + if fork_id_value == applied_fork_id { + continue; + } + + let (start_height, _start_block_id, tip_id, current_height) = info.value(); + + // If this fork is left behind the main chain, remove it + if start_height < fork_height && current_height <= fork_height { + forks_to_remove.push((fork_id_value, tip_id)); + } + } + + // Now remove the outdated forks + let mut fork_tips_table = db + .open_table(TABLE_FORK_TIPS) + .map_err(|e| BlockProcError::Custom(format!("Fork tips table error: {}", e)))?; + + for (fork_id, tip_id) in &forks_to_remove { + // Remove the tip mapping + fork_tips_table + .remove(*tip_id) + .map_err(|e| BlockProcError::Custom(format!("Fork tip removal error: {}", e)))?; + + // Remove the fork entry + forks_table + .remove(*fork_id) + .map_err(|e| BlockProcError::Custom(format!("Fork removal error: {}", e)))?; + + log::info!( + target: NAME, + "Removed obsolete fork {} after reorganization", + fork_id + ); + } + + let (_start_height, _start_block_id, tip_id, _current_height) = { + // Finally, remove the applied fork as well + // Get the tip ID for the applied fork + let fork_info = forks_table + .get(applied_fork_id) + .map_err(|e| BlockProcError::Custom(format!("Fork lookup error: {}", e)))? + .expect("Applied fork should exist"); + fork_info.value() + }; + + // Remove the tip mapping + fork_tips_table + .remove(tip_id) + .map_err(|e| BlockProcError::Custom(format!("Fork tip removal error: {}", e)))?; + + // Remove the fork entry + forks_table + .remove(applied_fork_id) + .map_err(|e| BlockProcError::Custom(format!("Fork removal error: {}", e)))?; + + log::info!( + target: NAME, + "Removed applied fork {} after successful reorganization", + applied_fork_id + ); + + Ok(()) + } + + /// Helper method to get fork information, reducing the need to repeatedly open the forks table + fn get_fork_info( + &self, + db: &WriteTransaction, + fork_id: ForkId, + ) -> Result<(u32, BlockId, BlockId, u32), BlockProcError> { + let forks_table = db + .open_table(TABLE_FORKS) + .map_err(|e| BlockProcError::Custom(format!("Forks table error: {}", e)))?; + + let fork_info = match forks_table + .get(fork_id) + .map_err(|e| BlockProcError::Custom(format!("Fork lookup error: {}", e)))? + { + Some(record) => record.value(), + None => { + return Err(BlockProcError::Custom(format!( + "Fork {} not found in database", + fork_id + ))); + } + }; + + Ok(fork_info) + } } #[derive(Debug, Display, Error, From)] @@ -163,6 +2387,9 @@ pub enum BlockProcError { /// Unable to open blocks table: {0} BlockTable(TableError), + /// Unable to open heights table: {0} + HeightsTable(TableError), + /// Unable to write to blocks table: {0} BlockStorage(StorageError), @@ -177,4 +2404,25 @@ pub enum BlockProcError { /// Unable to write to transactions table: {0} TxesStorage(StorageError), + + /// Error looking up transaction ID: {0} + TxidLookup(StorageError), + + /// Unable to find block: {0} + BlockLookup(StorageError), + + /// Orphan block detected: parent block {0} not found + OrphanBlock(BlockHash), + + /// Potential fork detected: new block {0} at height {1} conflicts with existing block {2} + PotentialFork(BlockHash, u32, BlockId), + + /// Fork chain extension: new block {0} extends fork chain with parent block {1} + ForkChainExtension(BlockHash, BlockHash), + + /// Database inconsistency: block {0} has parent {1} with ID {2} but missing height + DatabaseInconsistency(BlockHash, BlockHash, BlockId), + + /// Custom error: {0} + Custom(String), } diff --git a/src/broker.rs b/src/broker.rs index 594d299..de705ef 100644 --- a/src/broker.rs +++ b/src/broker.rs @@ -72,7 +72,7 @@ impl Broker { const TIMEOUT: Option = Some(Duration::from_secs(60 * 10)); log::info!("Starting database managing thread..."); - let indexdb = IndexDb::new(&conf.data_dir.join(PATH_INDEXDB))?; + let indexdb = IndexDb::new(conf.data_dir.join(PATH_INDEXDB))?; let db = UThread::new(indexdb, TIMEOUT); log::info!("Starting block importer thread..."); @@ -81,7 +81,7 @@ impl Broker { let listen = conf.import.iter().map(|addr| { NetAccept::bind(addr).unwrap_or_else(|err| panic!("unable to bind to {addr}: {err}")) }); - let importer = service::Runtime::new(conf.import[0].clone(), controller, listen) + let importer = service::Runtime::new(conf.import[0], controller, listen) .map_err(|err| BrokerError::Import(err.into()))?; log::info!("Starting RPC server thread..."); @@ -90,7 +90,7 @@ impl Broker { let listen = conf.rpc.iter().map(|addr| { NetAccept::bind(addr).unwrap_or_else(|err| panic!("unable to bind to {addr}: {err}")) }); - let rpc = service::Runtime::new(conf.rpc[0].clone(), controller, listen) + let rpc = service::Runtime::new(conf.rpc[0], controller, listen) .map_err(|err| BrokerError::Rpc(err.into()))?; log::info!("Launch completed successfully"); @@ -161,9 +161,10 @@ impl Broker { match msg { ImporterMsg::Mined(txid) => { for (remote, filters) in &self.tracking { - // TODO: Check against Bloom filters - if false { - self.rpc.cmd(RpcCmd::Send(*remote, Response::Mined(txid)))?; + for filter in filters { + if filter.contains(txid) { + self.rpc.cmd(RpcCmd::Send(*remote, Response::Mined(txid)))?; + } } } } diff --git a/src/config.rs b/src/config.rs index 17a0cb7..bace821 100644 --- a/src/config.rs +++ b/src/config.rs @@ -35,9 +35,15 @@ pub struct Config { /// Data location pub data_dir: PathBuf, + /// Bitcoin network type (mainnet, testnet, etc.) + /// + /// Each BP-Node instance is designed to work with a single network type. + /// To work with multiple networks, create separate instances with different data directories. pub network: Network, + /// Addresses to listen for RPC connections pub rpc: Vec, + /// Addresses to listen for block import connections pub import: Vec, } diff --git a/src/db.rs b/src/db.rs index 698f790..0a678d9 100644 --- a/src/db.rs +++ b/src/db.rs @@ -24,25 +24,62 @@ use std::cmp::Ordering; use std::ops::ControlFlow; use std::path::Path; +use std::process::exit; use amplify::num::u40; use amplify::{ByteArray, FromSliceError}; -use bpwallet::{BlockHeader, ConsensusDecode, ConsensusEncode, Tx}; +use bpwallet::{Block, BlockHeader, ConsensusDecode, ConsensusEncode, Network, Tx}; use crossbeam_channel::{SendError, Sender}; use microservices::UService; use redb::{ - Database, DatabaseError, ReadTransaction, TableDefinition, TransactionError, TypeName, - WriteTransaction, + Database, DatabaseError, Key, ReadTransaction, TableDefinition, TransactionError, TypeName, + Value, WriteTransaction, }; +// see also constants in `bin/bpd.rs` +const EXIT_DB_INIT_MAIN_TABLE: i32 = 6; +const EXIT_DB_INIT_TABLE: i32 = 7; +const EXIT_DB_INIT_ERROR: i32 = 8; + #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Display)] #[display("#{0:010X}")] pub struct TxNo(u40); +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Display)] +#[display("#{0:08X}")] +pub struct Id(u32); + +pub type BlockId = Id; +pub type ForkId = Id; + impl TxNo { pub fn start() -> Self { TxNo(u40::ONE) } pub fn inc_assign(&mut self) { self.0 += u40::ONE } + + pub fn into_inner(self) -> u40 { self.0 } +} + +impl Id { + pub fn start() -> Self { Id(0) } + + pub fn inc_assign(&mut self) { self.0 += 1 } + + // Method to access the u32 value + pub fn as_u32(&self) -> u32 { self.0 } + + // Method to get bytes representation + pub fn to_bytes(&self) -> [u8; 4] { self.0.to_be_bytes() } + + // Method to create Id from bytes + pub fn from_bytes(bytes: &[u8]) -> Self { + debug_assert_eq!(bytes.len(), 4); + let mut array = [0u8; 4]; + array.copy_from_slice(bytes); + Id(u32::from_be_bytes(array)) + } + + pub fn into_inner(self) -> u32 { self.0 } } impl ByteArray<5> for TxNo { @@ -68,6 +105,9 @@ impl ByteArray<5> for TxNo { #[derive(Wrapper, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Debug, From)] pub struct DbBlockHeader(#[from] BlockHeader); +#[derive(Wrapper, Clone, Eq, PartialEq, Debug, From)] +pub struct DbBlock(#[from] Block); + #[derive(Wrapper, Clone, Eq, PartialEq, Debug, From)] pub struct DbTx(#[from] Tx); @@ -115,6 +155,25 @@ impl redb::Value for DbBlockHeader { fn type_name() -> TypeName { TypeName::new("BpNodeBlockHeader") } } +impl redb::Value for DbBlock { + type SelfType<'a> = Self; + type AsBytes<'a> = Vec; + + fn fixed_width() -> Option { None } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where Self: 'a { + Self(unsafe { Block::consensus_deserialize(data).unwrap_unchecked() }) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where Self: 'b { + value.0.consensus_serialize() + } + + fn type_name() -> TypeName { TypeName::new("BpNodeBlock") } +} + impl redb::Value for DbTx { type SelfType<'a> = Self; type AsBytes<'a> = Vec; @@ -134,15 +193,114 @@ impl redb::Value for DbTx { fn type_name() -> TypeName { TypeName::new("BpNodeTx") } } +impl redb::Key for Id { + fn compare(data1: &[u8], data2: &[u8]) -> Ordering { data1.cmp(data2) } +} + +impl redb::Value for Id { + type SelfType<'a> = Self; + + type AsBytes<'a> = [u8; 4]; + + fn fixed_width() -> Option { Some(4) } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where Self: 'a { + Id::from_bytes(data) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where Self: 'b { + value.to_bytes() + } + + fn type_name() -> TypeName { TypeName::new("BpNodeBlockId") } +} + +pub const REC_TXNO: &str = "txno"; +pub const REC_BLOCKID: &str = "blockid"; +pub const REC_CHAIN: &str = "chain"; +pub const REC_ORPHANS: &str = "orphans"; +// Network information record in main table +pub const REC_NETWORK: &str = "network"; +// Constants for fork management +pub const REC_FORK_ID: &str = "forkid"; + +// Main metadata table storing global counters and states pub const TABLE_MAIN: TableDefinition<&'static str, &[u8]> = TableDefinition::new("main"); -pub const TABLE_BLKS: TableDefinition<[u8; 32], DbBlockHeader> = TableDefinition::new("blocks"); + +// Maps block hash to block header +pub const TABLE_BLKS: TableDefinition = TableDefinition::new("blocks"); + +// Maps transaction ID to internal transaction number pub const TABLE_TXIDS: TableDefinition<[u8; 32], TxNo> = TableDefinition::new("txids"); + +// Maps block hash to internal block ID +pub const TABLE_BLOCKIDS: TableDefinition<[u8; 32], BlockId> = TableDefinition::new("blockids"); + +// Stores complete transaction data pub const TABLE_TXES: TableDefinition = TableDefinition::new("transactions"); + +// Maps transaction number to transaction numbers that spend its outputs pub const TABLE_OUTS: TableDefinition> = TableDefinition::new("spends"); -pub const TABLE_SPKS: TableDefinition<&[u8], TxNo> = TableDefinition::new("scripts"); -pub const REC_TXNO: &str = "txno"; +// Maps script pubkey to a list of transaction numbers containing it +pub const TABLE_SPKS: TableDefinition<&[u8], Vec> = TableDefinition::new("scripts"); + +// Tracks unspent transaction outputs +pub const TABLE_UTXOS: TableDefinition<(TxNo, u32), ()> = TableDefinition::new("utxos"); + +// Maps block height to block ID +pub const TABLE_HEIGHTS: TableDefinition = TableDefinition::new("block_heights"); + +// Maps block ID to block height (reverse of TABLE_HEIGHTS) +pub const TABLE_BLOCK_HEIGHTS: TableDefinition = + TableDefinition::new("blockid_height"); + +// Maps transaction number to the block ID it belongs to +pub const TABLE_TX_BLOCKS: TableDefinition = TableDefinition::new("tx_blocks"); +// Maps block ID to all transaction numbers it contains +pub const TABLE_BLOCK_TXS: TableDefinition> = TableDefinition::new("block_txs"); + +// Maps transaction input to the output it spends +pub const TABLE_INPUTS: TableDefinition<(TxNo, u32), (TxNo, u32)> = TableDefinition::new("inputs"); + +// Records all UTXOs spent in each block for potential rollback +pub const TABLE_BLOCK_SPENDS: TableDefinition> = + TableDefinition::new("block_spends"); + +// Stores orphan blocks (blocks received without their parent blocks) +// Maps block hash to (block data, timestamp) +// Note: Orphan blocks are not assigned BlockId values because: +// 1. They are in a temporary state and may never become part of the main chain +// 2. Many orphans may eventually be discarded when their ancestry is resolved +// 3. BlockId resources are preserved for blocks that are (or may become) part of the chain +pub const TABLE_ORPHANS: TableDefinition<[u8; 32], (DbBlock, u64)> = + TableDefinition::new("orphans"); + +// Maps parent block hash to list of orphan blocks that depend on it +pub const TABLE_ORPHAN_PARENTS: TableDefinition<[u8; 32], Vec<[u8; 32]>> = + TableDefinition::new("orphan_parents"); + +// Tracks blockchain forks - maps fork ID to (fork_start_height, fork_start_block_id, tip_block_id, +// current_height) +pub const TABLE_FORKS: TableDefinition = + TableDefinition::new("forks"); + +// Maps fork tip block ID to fork ID for quick lookup +pub const TABLE_FORK_TIPS: TableDefinition = TableDefinition::new("fork_tips"); + +// Stores complete block data for fork blocks +// This allows us to access the full block content when performing chain reorganization +// Fork blocks are stored with their assigned BlockId like main chain blocks +pub const TABLE_FORK_BLOCKS: TableDefinition = + TableDefinition::new("fork_blocks"); + +// Each BP-Node instance is designed to work with a single blockchain network. +// If multiple networks need to be indexed, separate instances should be used +// with different data directories. The network information is stored in the +// MAIN table under the REC_NETWORK key. pub struct IndexDb(Database); impl IndexDb { @@ -195,3 +353,110 @@ impl UService for IndexDb { } } } + +/// Initialize database tables +pub fn initialize_db_tables(db: &Database, network: Network) { + // It's necessary to open all tables with WriteTransaction to ensure they are created + // In ReDB, tables are only created when first opened with a WriteTransaction + // If later accessed with ReadTransaction without being created first, errors will occur + match db.begin_write() { + Ok(tx) => { + // Initialize main table with network information + initialize_main_table(&tx, network); + + // Initialize all other tables by group + create_core_tables(&tx); + create_utxo_tables(&tx); + create_block_height_tables(&tx); + create_transaction_block_tables(&tx); + create_orphan_tables(&tx); + create_fork_tables(&tx); + + // Commit the transaction + if let Err(err) = tx.commit() { + eprintln!("Failed to commit initial database transaction: {err}"); + exit(EXIT_DB_INIT_ERROR); + } + } + Err(err) => { + eprintln!("Failed to begin database transaction: {err}"); + exit(EXIT_DB_INIT_ERROR); + } + } +} + +/// Initialize the main table with network information +fn initialize_main_table(tx: &WriteTransaction, network: Network) { + match tx.open_table(TABLE_MAIN) { + Ok(mut main_table) => { + if let Err(err) = main_table.insert(REC_NETWORK, network.to_string().as_bytes()) { + eprintln!("Failed to write network information to database: {err}"); + exit(EXIT_DB_INIT_MAIN_TABLE); + } + } + Err(err) => { + eprintln!("Failed to open main table in database: {err}"); + exit(EXIT_DB_INIT_MAIN_TABLE); + } + } +} + +/// Create core block and transaction tables +fn create_core_tables(tx: &WriteTransaction) { + log::info!("Creating core block and transaction tables..."); + create_table(tx, TABLE_BLKS, "blocks"); + create_table(tx, TABLE_TXIDS, "txids"); + create_table(tx, TABLE_BLOCKIDS, "blockids"); + create_table(tx, TABLE_TXES, "transactions"); +} + +/// Create UTXO and transaction relationship tables +fn create_utxo_tables(tx: &WriteTransaction) { + log::info!("Creating UTXO and transaction relationship tables..."); + create_table(tx, TABLE_OUTS, "spends"); + create_table(tx, TABLE_SPKS, "scripts"); + create_table(tx, TABLE_UTXOS, "utxos"); +} + +/// Create block height mapping tables +fn create_block_height_tables(tx: &WriteTransaction) { + log::info!("Creating block height mapping tables..."); + create_table(tx, TABLE_HEIGHTS, "block_heights"); + create_table(tx, TABLE_BLOCK_HEIGHTS, "blockid_height"); +} + +/// Create transaction-block relationship tables +fn create_transaction_block_tables(tx: &WriteTransaction) { + log::info!("Creating transaction-block relationship tables..."); + create_table(tx, TABLE_TX_BLOCKS, "tx_blocks"); + create_table(tx, TABLE_BLOCK_TXS, "block_txs"); + create_table(tx, TABLE_INPUTS, "inputs"); + create_table(tx, TABLE_BLOCK_SPENDS, "block_spends"); +} + +/// Create orphan blocks tables +fn create_orphan_tables(tx: &WriteTransaction) { + log::info!("Creating orphan blocks tables..."); + create_table(tx, TABLE_ORPHANS, "orphans"); + create_table(tx, TABLE_ORPHAN_PARENTS, "orphan_parents"); +} + +/// Create fork management tables +fn create_fork_tables(tx: &WriteTransaction) { + log::info!("Creating fork management tables..."); + create_table(tx, TABLE_FORKS, "forks"); + create_table(tx, TABLE_FORK_TIPS, "fork_tips"); + create_table(tx, TABLE_FORK_BLOCKS, "fork_blocks"); +} + +/// Generic function to create a table with error handling +fn create_table( + tx: &WriteTransaction, + table_def: TableDefinition, + table_name: &str, +) { + if let Err(err) = tx.open_table(table_def) { + eprintln!("Failed to create {} table: {err}", table_name); + exit(EXIT_DB_INIT_TABLE); + } +} diff --git a/src/importer.rs b/src/importer.rs index 8416c78..66a4b66 100644 --- a/src/importer.rs +++ b/src/importer.rs @@ -120,7 +120,7 @@ impl ServiceController for BlockI let client = self.providers.remove(&addr).unwrap_or_else(|| { panic!("Block provider at {addr} got disconnected but not found in providers list"); }); - log::warn!(target: NAME, "Block provider at {addr} got disconnected due to {reason} ({})", client.agent.map(|a| a.to_string()).unwrap_or(none!())); + log::warn!(target: NAME, "Block provider at {addr} got disconnected due to {reason} ({})", client.agent.map(|a| a.to_string()).unwrap_or_default()); } fn on_command(&mut self, cmd: ImporterCmd) { @@ -155,7 +155,7 @@ impl ServiceController for BlockI ExporterPub::Block(block) => { let block_id = block.header.block_hash(); log::debug!("Received block {block_id} from {remote}"); - match self.processor.process_block(block_id, block) { + match self.processor.process_block_and_orphans(block_id, block) { Err(err) => { log::error!(target: NAME, "{err}"); log::warn!(target: NAME, "Block {block_id} got dropped due to database connectivity issue"); diff --git a/src/lib.rs b/src/lib.rs index 27df240..2d6d36e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! BP-Node: Bitcoin blockchain indexer +//! +//! Each BP-Node instance is designed to work with a single blockchain network. +//! If multiple networks need to be indexed (mainnet, testnet, etc.), separate +//! instances should be used with different data directories. + #[macro_use] extern crate amplify; @@ -35,6 +41,7 @@ mod importer; pub use blocks::{BlockProcError, BlockProcessor}; pub use broker::{Broker, BrokerError, BrokerRpcMsg, PATH_INDEXDB, TrackReq}; pub use config::Config; +pub use db::{REC_NETWORK, initialize_db_tables}; pub use importer::{BlockImporter, ImporterCmd, ImporterMsg}; pub use rpc::{RpcCmd, RpcController}; //pub use query::{QueryWorker, QueryReq, QueryResp}; diff --git a/src/rpc.rs b/src/rpc.rs index e5e9a84..adf830e 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -110,7 +110,7 @@ impl ServiceController for RpcControll let client = self.clients.remove(&remote).unwrap_or_else(|| { panic!("Client at {remote} got disconnected but not found in providers list"); }); - log::warn!(target: NAME, "Client at {remote} got disconnected due to {reason} ({})", client.agent.map(|a| a.to_string()).unwrap_or(none!())); + log::warn!(target: NAME, "Client at {remote} got disconnected due to {reason} ({})", client.agent.map(|a| a.to_string()).unwrap_or_default()); self.broker .send(BrokerRpcMsg::UntrackAll(remote)) .expect("Unable to communicate to broker");