Skip to content

Commit

Permalink
add clear unused transaction logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Shourya742 committed Oct 29, 2024
1 parent 8d95ca6 commit a828a85
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 11 deletions.
62 changes: 60 additions & 2 deletions roles/jd-server/src/lib/job_declarator/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ use roles_logic_sv2::{
ProvideMissingTransactions, ProvideMissingTransactionsSuccess, SubmitSolutionJd,
},
parsers::JobDeclaration,
utils::Mutex,
};
use std::{convert::TryInto, io::Cursor};
use std::{convert::TryInto, io::Cursor, sync::Arc};
use stratum_common::bitcoin::{Transaction, Txid};
pub type SendTo = SendTo_<JobDeclaration<'static>, ()>;
use crate::mempool::JDsMempool;

use super::{signed_token, TransactionState};
use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages};
use stratum_common::bitcoin::consensus::Decodable;
use tracing::info;
use tracing::{debug, info};

use super::JobDeclaratorDownstream;

Expand Down Expand Up @@ -61,6 +64,9 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
}

fn handle_declare_mining_job(&mut self, message: DeclareMiningJob) -> Result<SendTo, Error> {
if let Some(old_mining_job) = self.declared_mining_job.0.take() {
clear_declared_mining_job(old_mining_job, &message, self.mempool.clone())?;

Check warning on line 68 in roles/jd-server/src/lib/job_declarator/message_handler.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/job_declarator/message_handler.rs#L67-L68

Added lines #L67 - L68 were not covered by tests
}
// the transactions that are present in the mempool are stored here, that is sent to the
// mempool which use the rpc client to retrieve the whole data for each transaction.
// The unknown transactions is a vector that contains the transactions that are not in the
Expand Down Expand Up @@ -219,3 +225,55 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
Ok(SendTo::None(Some(m)))
}
}

pub fn clear_declared_mining_job(

Check warning on line 229 in roles/jd-server/src/lib/job_declarator/message_handler.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/job_declarator/message_handler.rs#L229

Added line #L229 was not covered by tests
old_mining_job: DeclareMiningJob,
new_mining_job: &DeclareMiningJob,
mempool: Arc<Mutex<JDsMempool>>,
) -> Result<(), Error> {
let old_transactions = old_mining_job.tx_short_hash_list.inner_as_ref();
let new_transactions = new_mining_job.tx_short_hash_list.inner_as_ref();

if old_transactions.is_empty() {
info!("No transactions to remove from mempool");
return Ok(());

Check warning on line 239 in roles/jd-server/src/lib/job_declarator/message_handler.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/job_declarator/message_handler.rs#L237-L239

Added lines #L237 - L239 were not covered by tests
}

let nonce = old_mining_job.tx_short_hash_nonce;

Check warning on line 242 in roles/jd-server/src/lib/job_declarator/message_handler.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/job_declarator/message_handler.rs#L242

Added line #L242 was not covered by tests

let result = mempool.safe_lock(|mempool_| -> Result<(), Error> {
let short_ids_map = mempool_
.to_short_ids(nonce)
.ok_or(Error::JDSMissingTransactions)?;

Check warning on line 247 in roles/jd-server/src/lib/job_declarator/message_handler.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/job_declarator/message_handler.rs#L244-L247

Added lines #L244 - L247 were not covered by tests

for short_id in old_transactions {
if !new_transactions.contains(&short_id) {
if let Some(transaction_with_hash) = short_ids_map.get(short_id) {
let txid = transaction_with_hash.id;
match mempool_.mempool.get_mut(&txid) {

Check warning on line 253 in roles/jd-server/src/lib/job_declarator/message_handler.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/job_declarator/message_handler.rs#L249-L253

Added lines #L249 - L253 were not covered by tests
Some(Some((transaction, counter))) => {
if *counter > 1 {
*counter -= 1;
info!("Fat transaction {:?} decreased mempool counter because job id {:?} was dropped", txid, old_mining_job.request_id);

Check warning on line 257 in roles/jd-server/src/lib/job_declarator/message_handler.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/job_declarator/message_handler.rs#L255-L257

Added lines #L255 - L257 were not covered by tests
} else {
mempool_.mempool.remove(&txid);
info!("Fat transaction {:?} in job with request id {:?} removed from mempool", txid, old_mining_job.request_id);

Check warning on line 260 in roles/jd-server/src/lib/job_declarator/message_handler.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/job_declarator/message_handler.rs#L260

Added line #L260 was not covered by tests
}
},
Some(None) => info!("Thin transaction {:?} in job with request id {:?} removed from mempool", txid, old_mining_job.request_id),

Check warning on line 263 in roles/jd-server/src/lib/job_declarator/message_handler.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/job_declarator/message_handler.rs#L263

Added line #L263 was not covered by tests
None => {},
}
} else {
debug!("Transaction with short id {:?} not found in mempool while clearing old jobs", short_id);

Check warning on line 267 in roles/jd-server/src/lib/job_declarator/message_handler.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/job_declarator/message_handler.rs#L267

Added line #L267 was not covered by tests
}
}
}
Ok(())

Check warning on line 271 in roles/jd-server/src/lib/job_declarator/message_handler.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/job_declarator/message_handler.rs#L271

Added line #L271 was not covered by tests
});

if let Err(err) = result {
return Err(Error::PoisonLock(err.to_string()));

Check warning on line 275 in roles/jd-server/src/lib/job_declarator/message_handler.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/job_declarator/message_handler.rs#L274-L275

Added lines #L274 - L275 were not covered by tests
}

Ok(())

Check warning on line 278 in roles/jd-server/src/lib/job_declarator/message_handler.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/job_declarator/message_handler.rs#L278

Added line #L278 was not covered by tests
}
2 changes: 1 addition & 1 deletion roles/jd-server/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl JobDeclaratorDownstream {
.ok_or(Box::new(JdsError::ImpossibleToReconstructBlock(
"Txid found in jds mempool but transactions not present".to_string(),
)))?;
transactions_list.push(tx);
transactions_list.push(tx.0);

Check warning on line 148 in roles/jd-server/src/lib/job_declarator/mod.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/job_declarator/mod.rs#L148

Added line #L148 was not covered by tests
} else {
return Err(Box::new(JdsError::ImpossibleToReconstructBlock(
"Unknown transaction".to_string(),
Expand Down
19 changes: 11 additions & 8 deletions roles/jd-server/src/lib/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ use stratum_common::{bitcoin, bitcoin::hash_types::Txid};
#[derive(Clone, Debug)]
pub struct TransactionWithHash {
pub id: Txid,
pub tx: Option<Transaction>,
pub tx: Option<(Transaction, u32)>,
}

#[derive(Clone, Debug)]
pub struct JDsMempool {
pub mempool: HashMap<Txid, Option<Transaction>>,
pub mempool: HashMap<Txid, Option<(Transaction, u32)>>,
auth: mini_rpc_client::Auth,
url: String,
new_block_receiver: Receiver<String>,
Expand Down Expand Up @@ -50,7 +50,7 @@ impl JDsMempool {
new_block_receiver: Receiver<String>,
) -> Self {
let auth = mini_rpc_client::Auth::new(username, password);
let empty_mempool: HashMap<Txid, Option<Transaction>> = HashMap::new();
let empty_mempool: HashMap<Txid, Option<(Transaction, u32)>> = HashMap::new();
JDsMempool {
mempool: empty_mempool,
auth,
Expand Down Expand Up @@ -82,28 +82,31 @@ impl JDsMempool {
.get_raw_transaction(&txid.to_string(), None)
.await
.map_err(JdsMempoolError::Rpc)?;
let _ =
self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some(transaction)));
let _ = self_
.safe_lock(|a| a.mempool.insert(transaction.txid(), Some((transaction, 1))));

Check warning on line 86 in roles/jd-server/src/lib/mempool/mod.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/mempool/mod.rs#L85-L86

Added lines #L85 - L86 were not covered by tests
}
}

// fill in the mempool the transactions given in input
for transaction in transactions {
let _ = self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some(transaction)));
let _ =

Check warning on line 92 in roles/jd-server/src/lib/mempool/mod.rs

View check run for this annotation

Codecov / codecov/patch

roles/jd-server/src/lib/mempool/mod.rs#L92

Added line #L92 was not covered by tests
self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some((transaction, 1))));
}
Ok(())
}

pub async fn update_mempool(self_: Arc<Mutex<Self>>) -> Result<(), JdsMempoolError> {

let client = self_
.safe_lock(|x| x.get_client())?
.ok_or(JdsMempoolError::NoClient)?;

let mempool = client.get_raw_mempool().await?;
let raw_mempool_txids: Result<Vec<Txid>, _> = mempool
.into_iter()
.map(|id| Txid::from_str(&id).map_err(|err| JdsMempoolError::Rpc(RpcError::Deserialization(err.to_string()))))
.map(|id| {
Txid::from_str(&id)
.map_err(|err| JdsMempoolError::Rpc(RpcError::Deserialization(err.to_string())))
})
.collect();

let raw_mempool_txids = raw_mempool_txids?;
Expand Down

0 comments on commit a828a85

Please sign in to comment.