Skip to content

Commit

Permalink
Merge pull request #1095 from jbesraa/2024-08-13-refactor-jdserver
Browse files Browse the repository at this point in the history
Move `JDServer` lib code out of `main.rs`
  • Loading branch information
plebhash committed Aug 30, 2024
2 parents 4173c4d + ef254e1 commit 5347351
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 168 deletions.
234 changes: 234 additions & 0 deletions roles/jd-server/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ pub mod job_declarator;
pub mod mempool;
pub mod status;

use async_channel::{bounded, unbounded, Receiver, Sender};
use error_handling::handle_result;
use job_declarator::JobDeclarator;
use mempool::error::JdsMempoolError;
use roles_logic_sv2::utils::Mutex;
use std::{ops::Sub, sync::Arc};
use tokio::{select, task};
use tracing::{error, info, warn};

use codec_sv2::{StandardEitherFrame, StandardSv2Frame};
use key_utils::{Secp256k1PublicKey, Secp256k1SecretKey};
use roles_logic_sv2::{
Expand All @@ -19,6 +28,177 @@ pub type Message = JdsMessages<'static>;
pub type StdFrame = StandardSv2Frame<Message>;
pub type EitherFrame = StandardEitherFrame<Message>;

pub struct JobDeclaratorServer {
config: Configuration,
}

impl JobDeclaratorServer {
pub fn new(config: Configuration) -> Self {
Self { config }
}
pub async fn start(&self) {
let config = self.config.clone();
let url = config.core_rpc_url.clone() + ":" + &config.core_rpc_port.clone().to_string();
let username = config.core_rpc_user.clone();
let password = config.core_rpc_pass.clone();
// TODO should we manage what to do when the limit is reaced?
let (new_block_sender, new_block_receiver): (Sender<String>, Receiver<String>) =
bounded(10);
let mempool = Arc::new(Mutex::new(mempool::JDsMempool::new(
url.clone(),
username,
password,
new_block_receiver,
)));
let mempool_update_interval = config.mempool_update_interval;
let mempool_cloned_ = mempool.clone();
let (status_tx, status_rx) = unbounded();
let sender = status::Sender::Downstream(status_tx.clone());
let mut last_empty_mempool_warning =
std::time::Instant::now().sub(std::time::Duration::from_secs(60));

// TODO if the jd-server is launched with core_rpc_url empty, the following flow is never
// taken. Consequentally new_block_receiver in JDsMempool::on_submit is never read, possibly
// reaching the channel bound. The new_block_sender is given as input to JobDeclarator::start()
if url.contains("http") {
let sender_update_mempool = sender.clone();
task::spawn(async move {
loop {
let update_mempool_result: Result<(), mempool::error::JdsMempoolError> =
mempool::JDsMempool::update_mempool(mempool_cloned_.clone()).await;
if let Err(err) = update_mempool_result {
match err {
JdsMempoolError::EmptyMempool => {
if last_empty_mempool_warning.elapsed().as_secs() >= 60 {
warn!("{:?}", err);
warn!("Template Provider is running, but its mempool is empty (possible reasons: you're testing in testnet, signet, or regtest)");
last_empty_mempool_warning = std::time::Instant::now();
}
}
JdsMempoolError::NoClient => {
mempool::error::handle_error(&err);
handle_result!(sender_update_mempool, Err(err));
}
JdsMempoolError::Rpc(_) => {
mempool::error::handle_error(&err);
handle_result!(sender_update_mempool, Err(err));
}
JdsMempoolError::PoisonLock(_) => {
mempool::error::handle_error(&err);
handle_result!(sender_update_mempool, Err(err));
}
}
}
tokio::time::sleep(mempool_update_interval).await;
// DO NOT REMOVE THIS LINE
//let _transactions = mempool::JDsMempool::_get_transaction_list(mempool_cloned_.clone());
}
});

let mempool_cloned = mempool.clone();
let sender_submit_solution = sender.clone();
task::spawn(async move {
loop {
let result = mempool::JDsMempool::on_submit(mempool_cloned.clone()).await;
if let Err(err) = result {
match err {
JdsMempoolError::EmptyMempool => {
if last_empty_mempool_warning.elapsed().as_secs() >= 60 {
warn!("{:?}", err);
warn!("Template Provider is running, but its mempool is empty (possible reasons: you're testing in testnet, signet, or regtest)");
last_empty_mempool_warning = std::time::Instant::now();
}
}
_ => {
// TODO here there should be a better error managmenet
mempool::error::handle_error(&err);
handle_result!(sender_submit_solution, Err(err));
}
}
}
}
});
};

let cloned = config.clone();
let mempool_cloned = mempool.clone();
let (sender_add_txs_to_mempool, receiver_add_txs_to_mempool) = unbounded();
task::spawn(async move {
JobDeclarator::start(
cloned,
sender,
mempool_cloned,
new_block_sender,
sender_add_txs_to_mempool,
)
.await
});
task::spawn(async move {
loop {
if let Ok(add_transactions_to_mempool) = receiver_add_txs_to_mempool.recv().await {
let mempool_cloned = mempool.clone();
task::spawn(async move {
match mempool::JDsMempool::add_tx_data_to_mempool(
mempool_cloned,
add_transactions_to_mempool,
)
.await
{
Ok(_) => (),
Err(err) => {
// TODO
// here there should be a better error management
mempool::error::handle_error(&err);
}
}
});
}
}
});

// Start the error handling loop
// See `./status.rs` and `utils/error_handling` for information on how this operates
loop {
let task_status = select! {
task_status = status_rx.recv() => task_status,
interrupt_signal = tokio::signal::ctrl_c() => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
// we also shut down in case of error
},
}
break;
}
};
let task_status: status::Status = task_status.unwrap();

match task_status.state {
// Should only be sent by the downstream listener
status::State::DownstreamShutdown(err) => {
error!(
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
err
);
}
status::State::TemplateProviderShutdown(err) => {
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
break;
}
status::State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
status::State::DownstreamInstanceDropped(downstream_id) => {
warn!("Dropping downstream instance {} from jds", downstream_id);
}
}
}
}
}

pub fn get_coinbase_output(config: &Configuration) -> Result<Vec<TxOut>, Error> {
let mut result = Vec::new();
for coinbase_output_pool in &config.coinbase_outputs {
Expand Down Expand Up @@ -55,6 +235,15 @@ pub struct CoinbaseOutput {
output_script_value: String,
}

impl CoinbaseOutput {
pub fn new(output_script_type: String, output_script_value: String) -> Self {
Self {
output_script_type,
output_script_value,
}
}
}

#[derive(Debug, Deserialize, Clone)]
pub struct Configuration {
#[serde(default = "default_true")]
Expand All @@ -72,6 +261,51 @@ pub struct Configuration {
pub mempool_update_interval: Duration,
}

#[derive(Debug, Deserialize, Clone)]
pub struct CoreRpc {
url: String,
port: u16,
user: String,
pass: String,
}

impl CoreRpc {
pub fn new(url: String, port: u16, user: String, pass: String) -> Self {
Self {
url,
port,
user,
pass,
}
}
}

impl Configuration {
pub fn new(
listen_jd_address: String,
authority_public_key: Secp256k1PublicKey,
authority_secret_key: Secp256k1SecretKey,
cert_validity_sec: u64,
coinbase_outputs: Vec<CoinbaseOutput>,
core_rpc: CoreRpc,
mempool_update_interval: Duration,
) -> Self {
Self {
async_mining_allowed: true,
listen_jd_address,
authority_public_key,
authority_secret_key,
cert_validity_sec,
coinbase_outputs,
core_rpc_url: core_rpc.url,
core_rpc_port: core_rpc.port,
core_rpc_user: core_rpc.user,
core_rpc_pass: core_rpc.pass,
mempool_update_interval,
}
}
}

fn default_true() -> bool {
true
}
Expand Down
Loading

0 comments on commit 5347351

Please sign in to comment.