Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MovePoolSv2 lib code out of main.rs #1097

Merged
merged 2 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions roles/pool/src/lib/mining_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ pub struct CoinbaseOutput {
output_script_value: String,
}

impl CoinbaseOutput {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This struct appears all over. I'll create an issue from this comment to consolidate to a location in the code base that makes sense.
#1066 (comment)
#1066 (comment)
#1066 (comment)

pub fn new(output_script_type: String, output_script_value: String) -> Self {
Self {
output_script_type,
output_script_value,
}
}
}

impl TryFrom<&CoinbaseOutput> for CoinbaseOutput_ {
type Error = Error;

Expand Down Expand Up @@ -96,6 +105,73 @@ pub struct Configuration {
pub test_only_listen_adress_plain: String,
}

pub struct TemplateProviderConfig {
address: String,
authority_public_key: Option<Secp256k1PublicKey>,
}

impl TemplateProviderConfig {
pub fn new(address: String, authority_public_key: Option<Secp256k1PublicKey>) -> Self {
Self {
address,
authority_public_key,
}
}
}

pub struct AuthorityConfig {
pub public_key: Secp256k1PublicKey,
pub secret_key: Secp256k1SecretKey,
}

impl AuthorityConfig {
pub fn new(public_key: Secp256k1PublicKey, secret_key: Secp256k1SecretKey) -> Self {
Self {
public_key,
secret_key,
}
}
}

pub struct ConnectionConfig {
listen_address: String,
cert_validity_sec: u64,
signature: String,
}

impl ConnectionConfig {
pub fn new(listen_address: String, cert_validity_sec: u64, signature: String) -> Self {
Self {
listen_address,
cert_validity_sec,
signature,
}
}
}

impl Configuration {
pub fn new(
plebhash marked this conversation as resolved.
Show resolved Hide resolved
pool_connection: ConnectionConfig,
template_provider: TemplateProviderConfig,
authority_config: AuthorityConfig,
coinbase_outputs: Vec<CoinbaseOutput>,
#[cfg(feature = "test_only_allow_unencrypted")] test_only_listen_adress_plain: String,
) -> Self {
Self {
listen_address: pool_connection.listen_address,
tp_address: template_provider.address,
tp_authority_public_key: template_provider.authority_public_key,
authority_public_key: authority_config.public_key,
authority_secret_key: authority_config.secret_key,
cert_validity_sec: pool_connection.cert_validity_sec,
coinbase_outputs,
pool_signature: pool_connection.signature,
#[cfg(feature = "test_only_allow_unencrypted")]
test_only_listen_adress_plain,
}
}
}

#[derive(Debug)]
pub struct Downstream {
// Either group or channel id
Expand Down
108 changes: 108 additions & 0 deletions roles/pool/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,111 @@ pub mod error;
pub mod mining_pool;
pub mod status;
pub mod template_receiver;

use async_channel::{bounded, unbounded};

use mining_pool::{get_coinbase_output, Configuration, Pool};
use template_receiver::TemplateRx;
use tracing::{error, info, warn};

use tokio::select;

pub struct PoolSv2 {
config: Configuration,
}

impl PoolSv2 {
pub fn new(config: Configuration) -> PoolSv2 {
PoolSv2 { config }
}
pub async fn start(self) {
let config = self.config.clone();
let (status_tx, status_rx) = unbounded();
let (s_new_t, r_new_t) = bounded(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: naming conventions for channels should be consistent with the rest of the code base.

let (s_prev_hash, r_prev_hash) = bounded(10);
let (s_solution, r_solution) = bounded(10);
let (s_message_recv_signal, r_message_recv_signal) = bounded(10);
let coinbase_output_result = get_coinbase_output(&config);
let coinbase_output_len = match coinbase_output_result {
Ok(coinbase_output) => coinbase_output.len() as u32,
Err(err) => {
error!("Failed to get Coinbase output: {:?}", err);
return;
}
};
let tp_authority_public_key = config.tp_authority_public_key;
let template_rx_res = TemplateRx::connect(
config.tp_address.parse().unwrap(),
s_new_t,
s_prev_hash,
r_solution,
r_message_recv_signal,
status::Sender::Upstream(status_tx.clone()),
coinbase_output_len,
tp_authority_public_key,
)
.await;

if let Err(e) = template_rx_res {
error!("Could not connect to Template Provider: {}", e);
return;
}

let pool = Pool::start(
config.clone(),
r_new_t,
r_prev_hash,
s_solution,
s_message_recv_signal,
status::Sender::DownstreamListener(status_tx),
);

// Start the error handling loop
// See `./status.rs` and `utils/error_handling` for information on how this operates
loop {
let task_status = select! {
task_status = status_rx.recv() => task_status,
interrupt_signal = tokio::signal::ctrl_c() => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
// we also shut down in case of error
},
}
break;
}
};
let task_status: status::Status = task_status.unwrap();

match task_status.state {
// Should only be sent by the downstream listener
status::State::DownstreamShutdown(err) => {
error!(
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
err
);
break;
}
status::State::TemplateProviderShutdown(err) => {
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
break;
}
status::State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
status::State::DownstreamInstanceDropped(downstream_id) => {
warn!("Dropping downstream instance {} from pool", downstream_id);
if pool
.safe_lock(|p| p.remove_downstream(downstream_id))
.is_err()
{
break;
}
}
}
}
}
}
101 changes: 3 additions & 98 deletions roles/pool/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
#![allow(special_module_name)]
use async_channel::{bounded, unbounded};

use tracing::{error, info, warn};
mod lib;
use lib::{
mining_pool::{get_coinbase_output, Configuration, Pool},
status,
template_receiver::TemplateRx,
};

use ext_config::{Config, File, FileFormat};
use tokio::select;
pub use lib::{mining_pool::Configuration, status, PoolSv2};
use tracing::error;

mod args {
use std::path::PathBuf;
Expand Down Expand Up @@ -106,93 +99,5 @@ async fn main() {
return;
}
};

let (status_tx, status_rx) = unbounded();
let (s_new_t, r_new_t) = bounded(10);
let (s_prev_hash, r_prev_hash) = bounded(10);
let (s_solution, r_solution) = bounded(10);
let (s_message_recv_signal, r_message_recv_signal) = bounded(10);
info!("Pool INITIALIZING with config: {:?}", &args.config_path);
let coinbase_output_result = get_coinbase_output(&config);
let coinbase_output_len = match coinbase_output_result {
Ok(coinbase_output) => coinbase_output.len() as u32,
Err(err) => {
error!("Failed to get coinbase output: {:?}", err);
return;
}
};
let tp_authority_public_key = config.tp_authority_public_key;
let template_rx_res = TemplateRx::connect(
config.tp_address.parse().unwrap(),
s_new_t,
s_prev_hash,
r_solution,
r_message_recv_signal,
status::Sender::Upstream(status_tx.clone()),
coinbase_output_len,
tp_authority_public_key,
)
.await;

if let Err(e) = template_rx_res {
error!("Could not connect to Template Provider: {}", e);
return;
}

let pool = Pool::start(
config.clone(),
r_new_t,
r_prev_hash,
s_solution,
s_message_recv_signal,
status::Sender::DownstreamListener(status_tx),
);

// Start the error handling loop
// See `./status.rs` and `utils/error_handling` for information on how this operates
loop {
let task_status = select! {
task_status = status_rx.recv() => task_status,
interrupt_signal = tokio::signal::ctrl_c() => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
// we also shut down in case of error
},
}
break;
}
};
let task_status: status::Status = task_status.unwrap();

match task_status.state {
// Should only be sent by the downstream listener
status::State::DownstreamShutdown(err) => {
error!(
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
err
);
break;
}
status::State::TemplateProviderShutdown(err) => {
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
break;
}
status::State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
status::State::DownstreamInstanceDropped(downstream_id) => {
warn!("Dropping downstream instance {} from pool", downstream_id);
if pool
.safe_lock(|p| p.remove_downstream(downstream_id))
.is_err()
{
break;
}
}
}
}
PoolSv2::new(config).start().await;
}
Loading