Skip to content

Commit

Permalink
Move code out of main.rs to lib
Browse files Browse the repository at this point in the history
..This would allow us to use the crate in other enviornements.
  • Loading branch information
jbesraa committed Aug 23, 2024
1 parent 0f0ee1e commit d4a4470
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 98 deletions.
76 changes: 76 additions & 0 deletions roles/pool/src/lib/mining_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ pub struct CoinbaseOutput {
output_script_value: String,
}

impl CoinbaseOutput {
pub fn new(output_script_type: String, output_script_value: String) -> Self {
Self {
output_script_type,
output_script_value,
}
}
}

impl TryFrom<&CoinbaseOutput> for CoinbaseOutput_ {
type Error = Error;

Expand Down Expand Up @@ -96,6 +105,73 @@ pub struct Configuration {
pub test_only_listen_adress_plain: String,
}

pub struct TemplateProviderConfig {
address: String,
authority_public_key: Option<Secp256k1PublicKey>,
}

impl TemplateProviderConfig {
pub fn new(address: String, authority_public_key: Option<Secp256k1PublicKey>) -> Self {
Self {
address,
authority_public_key,
}
}
}

pub struct AuthorityConfig {
pub public_key: Secp256k1PublicKey,
pub secret_key: Secp256k1SecretKey,
}

impl AuthorityConfig {
pub fn new(public_key: Secp256k1PublicKey, secret_key: Secp256k1SecretKey) -> Self {
Self {
public_key,
secret_key,
}
}
}

pub struct ConnectionConfig {
listen_address: String,
cert_validity_sec: u64,
signature: String,
}

impl ConnectionConfig {
pub fn new(listen_address: String, cert_validity_sec: u64, signature: String) -> Self {
Self {
listen_address,
cert_validity_sec,
signature,
}
}
}

impl Configuration {
pub fn new(
pool_connection: ConnectionConfig,
template_provider: TemplateProviderConfig,
authority_config: AuthorityConfig,
coinbase_outputs: Vec<CoinbaseOutput>,
#[cfg(feature = "test_only_allow_unencrypted")] test_only_listen_adress_plain: String,
) -> Self {
Self {
listen_address: pool_connection.listen_address,
tp_address: template_provider.address,
tp_authority_public_key: template_provider.authority_public_key,
authority_public_key: authority_config.public_key,
authority_secret_key: authority_config.secret_key,
cert_validity_sec: pool_connection.cert_validity_sec,
coinbase_outputs,
pool_signature: pool_connection.signature,
#[cfg(feature = "test_only_allow_unencrypted")]
test_only_listen_adress_plain,
}
}
}

#[derive(Debug)]
pub struct Downstream {
// Either group or channel id
Expand Down
108 changes: 108 additions & 0 deletions roles/pool/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,111 @@ pub mod error;
pub mod mining_pool;
pub mod status;
pub mod template_receiver;

use async_channel::{bounded, unbounded};

use mining_pool::{get_coinbase_output, Configuration, Pool};
use template_receiver::TemplateRx;
use tracing::{error, info, warn};

use tokio::select;

pub struct PoolSv2 {
config: Configuration,
}

impl PoolSv2 {
pub fn new(config: Configuration) -> PoolSv2 {
PoolSv2 { config }
}
pub async fn start(self) {
let config = self.config.clone();
let (status_tx, status_rx) = unbounded();
let (s_new_t, r_new_t) = bounded(10);
let (s_prev_hash, r_prev_hash) = bounded(10);
let (s_solution, r_solution) = bounded(10);
let (s_message_recv_signal, r_message_recv_signal) = bounded(10);
let coinbase_output_result = get_coinbase_output(&config);
let coinbase_output_len = match coinbase_output_result {
Ok(coinbase_output) => coinbase_output.len() as u32,
Err(err) => {
error!("Failed to get Coinbase output: {:?}", err);
return;
}
};
let tp_authority_public_key = config.tp_authority_public_key;
let template_rx_res = TemplateRx::connect(
config.tp_address.parse().unwrap(),
s_new_t,
s_prev_hash,
r_solution,
r_message_recv_signal,
status::Sender::Upstream(status_tx.clone()),
coinbase_output_len,
tp_authority_public_key,
)
.await;

if let Err(e) = template_rx_res {
error!("Could not connect to Template Provider: {}", e);
return;
}

let pool = Pool::start(
config.clone(),
r_new_t,
r_prev_hash,
s_solution,
s_message_recv_signal,
status::Sender::DownstreamListener(status_tx),
);

// Start the error handling loop
// See `./status.rs` and `utils/error_handling` for information on how this operates
loop {
let task_status = select! {
task_status = status_rx.recv() => task_status,
interrupt_signal = tokio::signal::ctrl_c() => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
// we also shut down in case of error
},
}
break;
}
};
let task_status: status::Status = task_status.unwrap();

match task_status.state {
// Should only be sent by the downstream listener
status::State::DownstreamShutdown(err) => {
error!(
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
err
);
break;
}
status::State::TemplateProviderShutdown(err) => {
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
break;
}
status::State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
status::State::DownstreamInstanceDropped(downstream_id) => {
warn!("Dropping downstream instance {} from pool", downstream_id);
if pool
.safe_lock(|p| p.remove_downstream(downstream_id))
.is_err()
{
break;
}
}
}
}
}
}
102 changes: 4 additions & 98 deletions roles/pool/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
#![allow(special_module_name)]
use async_channel::{bounded, unbounded};

use tracing::{error, info, warn};
mod lib;
use lib::{
mining_pool::{get_coinbase_output, Configuration, Pool},
status,
template_receiver::TemplateRx,
};

use ext_config::{Config, File, FileFormat};
use tokio::select;
pub use lib::status;
use lib::{mining_pool::Configuration, PoolSv2};
use tracing::error;

mod args {
use std::path::PathBuf;
Expand Down Expand Up @@ -106,93 +100,5 @@ async fn main() {
return;
}
};

let (status_tx, status_rx) = unbounded();
let (s_new_t, r_new_t) = bounded(10);
let (s_prev_hash, r_prev_hash) = bounded(10);
let (s_solution, r_solution) = bounded(10);
let (s_message_recv_signal, r_message_recv_signal) = bounded(10);
info!("Pool INITIALIZING with config: {:?}", &args.config_path);
let coinbase_output_result = get_coinbase_output(&config);
let coinbase_output_len = match coinbase_output_result {
Ok(coinbase_output) => coinbase_output.len() as u32,
Err(err) => {
error!("Failed to get coinbase output: {:?}", err);
return;
}
};
let tp_authority_public_key = config.tp_authority_public_key;
let template_rx_res = TemplateRx::connect(
config.tp_address.parse().unwrap(),
s_new_t,
s_prev_hash,
r_solution,
r_message_recv_signal,
status::Sender::Upstream(status_tx.clone()),
coinbase_output_len,
tp_authority_public_key,
)
.await;

if let Err(e) = template_rx_res {
error!("Could not connect to Template Provider: {}", e);
return;
}

let pool = Pool::start(
config.clone(),
r_new_t,
r_prev_hash,
s_solution,
s_message_recv_signal,
status::Sender::DownstreamListener(status_tx),
);

// Start the error handling loop
// See `./status.rs` and `utils/error_handling` for information on how this operates
loop {
let task_status = select! {
task_status = status_rx.recv() => task_status,
interrupt_signal = tokio::signal::ctrl_c() => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
// we also shut down in case of error
},
}
break;
}
};
let task_status: status::Status = task_status.unwrap();

match task_status.state {
// Should only be sent by the downstream listener
status::State::DownstreamShutdown(err) => {
error!(
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
err
);
break;
}
status::State::TemplateProviderShutdown(err) => {
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
break;
}
status::State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
status::State::DownstreamInstanceDropped(downstream_id) => {
warn!("Dropping downstream instance {} from pool", downstream_id);
if pool
.safe_lock(|p| p.remove_downstream(downstream_id))
.is_err()
{
break;
}
}
}
}
PoolSv2::new(config).start().await;
}

0 comments on commit d4a4470

Please sign in to comment.