Skip to content

Commit

Permalink
Refactor fetcher to use the main block source type
Browse files Browse the repository at this point in the history
  • Loading branch information
buffrr committed Sep 16, 2024
1 parent 57ae9bf commit 769fdcd
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 60 deletions.
13 changes: 8 additions & 5 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ use protocol::{
};
use serde::{Deserialize, Serialize};

use crate::store::{ChainState, ChainStore, LiveSnapshot, LiveStore, Sha256};
use crate::{
source::BitcoinRpcError,
store::{ChainState, ChainStore, LiveSnapshot, LiveStore, Sha256},
};

pub trait BlockSource {
fn get_block_hash(&self, height: u32) -> Result<BlockHash>;
fn get_block(&self, hash: &BlockHash) -> Result<Block>;
fn get_median_time(&self) -> anyhow::Result<u64>;
fn get_block_count(&self) -> Result<u64>;
fn get_block_hash(&self, height: u32) -> Result<BlockHash, BitcoinRpcError>;
fn get_block(&self, hash: &BlockHash) -> Result<Block, BitcoinRpcError>;
fn get_median_time(&self) -> Result<u64, BitcoinRpcError>;
fn get_block_count(&self) -> Result<u64, BitcoinRpcError>;
}

#[derive(Debug, Clone)]
Expand Down
69 changes: 31 additions & 38 deletions node/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::{
time::Duration,
};

use anyhow::anyhow;
use base64::Engine;
use bitcoin::{Block, BlockHash, Txid};
use hex::FromHexError;
Expand All @@ -34,8 +33,7 @@ pub struct BitcoinRpc {
}

pub struct BlockFetcher {
client: reqwest::blocking::Client,
rpc: Arc<BitcoinRpc>,
src: BitcoinBlockSource,
job_id: Arc<AtomicUsize>,
sender: std::sync::mpsc::SyncSender<BlockEvent>,
num_workers: usize,
Expand Down Expand Up @@ -303,15 +301,13 @@ impl BitcoinRpcAuth {

impl BlockFetcher {
pub fn new(
rpc: BitcoinRpc,
client: reqwest::blocking::Client,
src: BitcoinBlockSource,
num_workers: usize,
) -> (Self, std::sync::mpsc::Receiver<BlockEvent>) {
let (tx, rx) = std::sync::mpsc::sync_channel(12);
(
Self {
client,
rpc: Arc::new(rpc),
src,
job_id: Arc::new(AtomicUsize::new(0)),
sender: tx,
num_workers,
Expand All @@ -328,8 +324,7 @@ impl BlockFetcher {
self.stop();

let job_id = self.job_id.load(Ordering::SeqCst);
let task_client = self.client.clone();
let task_rpc = self.rpc.clone();
let task_src = self.src.clone();
let current_task = self.job_id.clone();
let task_sender = self.sender.clone();
let num_workers = self.num_workers;
Expand All @@ -348,20 +343,19 @@ impl BlockFetcher {
}
last_check = Instant::now();

let tip: u32 =
match task_rpc.send_json_blocking(&task_client, &task_rpc.get_block_count()) {
Ok(t) => t,
Err(e) => {
_ = task_sender.send(BlockEvent::Error(BlockFetchError::RpcError(e)));
return;
}
};
let tip: u32 = match task_src.get_block_count() {
Ok(t) => t as _,
Err(e) => {
_ = task_sender.send(BlockEvent::Error(BlockFetchError::RpcError(e)));
return;
}
};

if tip > checkpoint.height {
let res = Self::run_workers(
job_id,
current_task.clone(),
task_rpc.clone(),
task_src.clone(),
task_sender.clone(),
checkpoint,
tip,
Expand All @@ -386,7 +380,7 @@ impl BlockFetcher {
fn run_workers(
job_id: usize,
current_job: Arc<AtomicUsize>,
rpc: Arc<BitcoinRpc>,
src: BitcoinBlockSource,
sender: std::sync::mpsc::SyncSender<BlockEvent>,
start_block: ChainAnchor,
end_height: u32,
Expand All @@ -400,7 +394,7 @@ impl BlockFetcher {
queued_height: start_block.height + 1,
end_height,
ordered_sender: sender,
rpc,
src,
num_workers,
pool: ThreadPool::new(num_workers),
};
Expand All @@ -409,14 +403,14 @@ impl BlockFetcher {
}

pub fn fetch_block(
rpc: &BitcoinRpc,
client: &reqwest::blocking::Client,
source: &BitcoinBlockSource,
hash: &BlockHash,
) -> Result<Block, BitcoinRpcError> {
let block_req = rpc.get_block(&hash);
let block_req = source.rpc.get_block(&hash);
let id = block_req.id;
let response = rpc
.send_request_blocking(client, &block_req)?
let response = source
.rpc
.send_request_blocking(&source.client, &block_req)?
.error_for_status()?;
let mut raw = response.bytes()?.to_vec();

Expand Down Expand Up @@ -465,7 +459,7 @@ struct Workers {
queued_height: u32,
end_height: u32,
ordered_sender: std::sync::mpsc::SyncSender<BlockEvent>,
rpc: Arc<BitcoinRpc>,
src: BitcoinBlockSource,
num_workers: usize,
pool: ThreadPool,
}
Expand Down Expand Up @@ -521,7 +515,6 @@ impl Workers {
}

fn run(&mut self) -> Result<ChainAnchor, BlockFetchError> {
let client = reqwest::blocking::Client::new();
let (tx, rx) = std::sync::mpsc::sync_channel(self.num_workers);

'queue_blocks: while !self.queued_all() {
Expand All @@ -534,8 +527,7 @@ impl Workers {
return Err(BlockFetchError::ChannelClosed);
}
let tx = tx.clone();
let rpc = self.rpc.clone();
let task_client = client.clone();
let rpc = self.src.clone();
let task_sigterm = self.current_job.clone();
let height = self.queued_height;
let job_id = self.job_id;
Expand All @@ -545,9 +537,8 @@ impl Workers {
return;
}
let result: Result<_, BitcoinRpcError> = (move || {
let hash: BlockHash =
rpc.send_json_blocking(&task_client, &rpc.get_block_hash(height))?;
let block = BlockFetcher::fetch_block(&rpc, &task_client, &hash)?;
let hash: BlockHash = rpc.get_block_hash(height)?;
let block = BlockFetcher::fetch_block(&rpc, &hash)?;
Ok((ChainAnchor { height, hash }, block))
})();
_ = tx.send(result);
Expand Down Expand Up @@ -675,7 +666,7 @@ impl ErrorForRpc for reqwest::Response {
return Err(BitcoinRpcError::Rpc(e));
}

return Ok(rpc_res.result.unwrap());
Ok(rpc_res.result.unwrap())
}
}

Expand Down Expand Up @@ -704,29 +695,31 @@ impl BitcoinBlockSource {
}

impl BlockSource for BitcoinBlockSource {
fn get_block_hash(&self, height: u32) -> anyhow::Result<BlockHash> {
fn get_block_hash(&self, height: u32) -> Result<BlockHash, BitcoinRpcError> {
Ok(self
.rpc
.send_json_blocking(&self.client, &self.rpc.get_block_hash(height))?)
}

fn get_block(&self, hash: &BlockHash) -> anyhow::Result<Block> {
fn get_block(&self, hash: &BlockHash) -> Result<Block, BitcoinRpcError> {
Ok(self
.rpc
.send_json_blocking(&self.client, &self.rpc.get_block(hash))?)
}

fn get_median_time(&self) -> anyhow::Result<u64> {
fn get_median_time(&self) -> Result<u64, BitcoinRpcError> {
let info: serde_json::Value = self
.rpc
.send_json_blocking(&self.client, &self.rpc.get_blockchain_info())?;
if let Some(time) = info.get("mediantime").and_then(|t| t.as_u64()) {
return Ok(time);
}
return Err(anyhow!("Could not fetch median time"));
Err(BitcoinRpcError::Other(
"Could not fetch median time".to_string(),
))
}

fn get_block_count(&self) -> anyhow::Result<u64> {
fn get_block_count(&self) -> Result<u64, BitcoinRpcError> {
Ok(self
.rpc
.send_json_blocking(&self.client, &self.rpc.get_block_count())?)
Expand Down
4 changes: 1 addition & 3 deletions node/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,7 @@ impl Spaced {
start_block.hash, start_block.height
);

let rpc = source.rpc.clone();
let client = reqwest::blocking::Client::new();
let (fetcher, receiver) = BlockFetcher::new(rpc.clone(), client.clone(), self.num_workers);
let (fetcher, receiver) = BlockFetcher::new(source.clone(), self.num_workers);
fetcher.start(start_block);

let mut shutdown_signal = shutdown.subscribe();
Expand Down
3 changes: 1 addition & 2 deletions node/src/wallets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,7 @@ impl RpcWallet {
mut shutdown: broadcast::Receiver<()>,
num_workers: usize,
) -> anyhow::Result<()> {
let (fetcher, receiver) =
BlockFetcher::new(source.rpc.clone(), source.client.clone(), num_workers);
let (fetcher, receiver) = BlockFetcher::new(source.clone(), num_workers);

let mut wallet_tip = {
let tip = wallet.coins.local_chain().tip();
Expand Down
17 changes: 8 additions & 9 deletions node/tests/fetcher_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use std::{

use anyhow::Result;
use protocol::{bitcoin::BlockHash, constants::ChainAnchor};
use reqwest::blocking::Client;
use spaced::source::{BitcoinRpc, BitcoinRpcAuth, BlockEvent, BlockFetcher};
use spaced::source::{BitcoinBlockSource, BitcoinRpc, BitcoinRpcAuth, BlockEvent, BlockFetcher};
use testutil::TestRig;

async fn setup(blocks: u64) -> Result<(TestRig, u64, BlockHash)> {
Expand All @@ -21,16 +20,16 @@ async fn setup(blocks: u64) -> Result<(TestRig, u64, BlockHash)> {
fn test_block_fetching_from_bitcoin_rpc() -> Result<()> {
const GENERATED_BLOCKS: u64 = 10;

let (rig, mut height, hash) = tokio::runtime::Runtime::new()?
.block_on(setup(GENERATED_BLOCKS))?;
let fetcher_rpc = BitcoinRpc::new(
let (rig, mut height, hash) =
tokio::runtime::Runtime::new()?.block_on(setup(GENERATED_BLOCKS))?;
let fetcher_rpc = BitcoinBlockSource::new(BitcoinRpc::new(
&rig.bitcoind.rpc_url(),
BitcoinRpcAuth::UserPass("user".to_string(), "password".to_string()),
));
let (fetcher, receiver) = BlockFetcher::new(
fetcher_rpc.clone(),
8
);

let client = Client::new();
let (fetcher, receiver) = BlockFetcher::new(fetcher_rpc.clone(), client.clone(), 8);

fetcher.start(ChainAnchor { hash, height: 0 });

let timeout = Duration::from_secs(5);
Expand Down
6 changes: 3 additions & 3 deletions node/tests/wallet_tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::str::FromStr;

use protocol::bitcoin::{Address, Amount};
use spaced::rpc::RpcClient;
use spaced::wallets::AddressKind;
use spaced::{rpc::RpcClient, wallets::AddressKind};
use testutil::TestRig;

async fn setup() -> anyhow::Result<TestRig> {
Expand All @@ -22,7 +22,7 @@ async fn it_should_create_and_fund_wallet(rig: &TestRig) -> anyhow::Result<()> {
.wallet_get_new_address(name.clone(), AddressKind::Coin)
.await?,
)?
.assume_checked();
.assume_checked();
// have the rig send some coins
rig.send(&addr, Amount::from_sat(1000_000)).await?;
// mine the transaction
Expand Down

0 comments on commit 769fdcd

Please sign in to comment.