Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stratum-cli project #1115

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions roles/translator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ repository = "https://github.com/stratum-mining/stratum"
name = "translator_sv2"
path = "src/lib/mod.rs"

[[bin]]
name = "translator_sv2"
path = "src/main.rs"

[dependencies]
stratum-common = { version = "1.0.0", path = "../../common" }
async-channel = "1.5.1"
Expand Down
265 changes: 265 additions & 0 deletions roles/translator/src/lib/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,272 @@
use async_channel::{bounded, unbounded};
use futures::FutureExt;
use rand::Rng;
pub use roles_logic_sv2::utils::Mutex;
use status::Status;
use std::{
net::{IpAddr, SocketAddr},
str::FromStr,
sync::Arc,
time::Duration,
};

use tokio::{
sync::broadcast,
task::{self, AbortHandle},
};
use tracing::{debug, error, info, warn};
pub use v1::server_to_client;

use proxy_config::ProxyConfig;

use crate::status::State;

pub mod downstream_sv1;
pub mod error;
pub mod proxy;
pub mod proxy_config;
pub mod status;
pub mod upstream_sv2;
pub mod utils;

#[derive(Clone, Debug)]
pub struct TranslatorSv2 {
config: ProxyConfig,
}

impl TranslatorSv2 {
pub fn new(config: ProxyConfig) -> Self {
Self { config }
}

pub async fn start(self) {
let (tx_status, rx_status) = unbounded();

let target = Arc::new(Mutex::new(vec![0; 32]));

// Sender/Receiver to send SV1 `mining.notify` message from the `Bridge` to the `Downstream`
let (tx_sv1_notify, _rx_sv1_notify): (
broadcast::Sender<server_to_client::Notify>,
broadcast::Receiver<server_to_client::Notify>,
) = broadcast::channel(10);

let task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>> =
Arc::new(Mutex::new(Vec::new()));

self.internal_start(
tx_sv1_notify.clone(),
target.clone(),
tx_status.clone(),
task_collector.clone(),
)
.await;

debug!("Starting up signal listener");
let task_collector_ = task_collector.clone();

debug!("Starting up status listener");
// Check all tasks if is_finished() is true, if so exit
loop {
let task_status = tokio::select! {
task_status = rx_status.recv().fuse() => task_status,
};
let task_status: Status = task_status.unwrap();

match task_status.state {
// Should only be sent by the downstream listener
State::DownstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
break;
}
State::BridgeShutdown(err) => {
error!("SHUTDOWN from: {}", err);
break;
}
State::UpstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
break;
}
State::UpstreamTryReconnect(err) => {
error!("SHUTDOWN from: {}", err);

// wait a random amount of time between 0 and 3000ms
// if all the downstreams try to reconnect at the same time, the upstream may fail
let mut rng = rand::thread_rng();
let wait_time = rng.gen_range(0..=3000);
tokio::time::sleep(Duration::from_millis(wait_time)).await;

// kill al the tasks
let task_collector_aborting = task_collector_.clone();
kill_tasks(task_collector_aborting.clone());

warn!("Trying reconnecting to upstream");
self.internal_start(
tx_sv1_notify.clone(),
target.clone(),
tx_status.clone(),
task_collector_.clone(),
)
.await;
}
State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
}
}
}

async fn internal_start(
&self,
tx_sv1_notify: broadcast::Sender<server_to_client::Notify<'static>>,
target: Arc<Mutex<Vec<u8>>>,
tx_status: async_channel::Sender<Status<'static>>,
task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>,
) {
let proxy_config = self.config.clone();
// Sender/Receiver to send a SV2 `SubmitSharesExtended` from the `Bridge` to the `Upstream`
// (Sender<SubmitSharesExtended<'static>>, Receiver<SubmitSharesExtended<'static>>)
let (tx_sv2_submit_shares_ext, rx_sv2_submit_shares_ext) = bounded(10);

// `tx_sv1_bridge` sender is used by `Downstream` to send a `DownstreamMessages` message to
// `Bridge` via the `rx_sv1_downstream` receiver
// (Sender<downstream_sv1::DownstreamMessages>, Receiver<downstream_sv1::DownstreamMessages>)
let (tx_sv1_bridge, rx_sv1_downstream) = unbounded();

// Sender/Receiver to send a SV2 `NewExtendedMiningJob` message from the `Upstream` to the
// `Bridge`
// (Sender<NewExtendedMiningJob<'static>>, Receiver<NewExtendedMiningJob<'static>>)
let (tx_sv2_new_ext_mining_job, rx_sv2_new_ext_mining_job) = bounded(10);

// Sender/Receiver to send a new extranonce from the `Upstream` to this `main` function to be
// passed to the `Downstream` upon a Downstream role connection
// (Sender<ExtendedExtranonce>, Receiver<ExtendedExtranonce>)
let (tx_sv2_extranonce, rx_sv2_extranonce) = bounded(1);

// Sender/Receiver to send a SV2 `SetNewPrevHash` message from the `Upstream` to the `Bridge`
// (Sender<SetNewPrevHash<'static>>, Receiver<SetNewPrevHash<'static>>)
let (tx_sv2_set_new_prev_hash, rx_sv2_set_new_prev_hash) = bounded(10);

// Format `Upstream` connection address
let upstream_addr = SocketAddr::new(
IpAddr::from_str(&proxy_config.upstream_address)
.expect("Failed to parse upstream address!"),
proxy_config.upstream_port,
);

let diff_config = Arc::new(Mutex::new(proxy_config.upstream_difficulty_config.clone()));
let task_collector_upstream = task_collector.clone();
// Instantiate a new `Upstream` (SV2 Pool)
let upstream = match upstream_sv2::Upstream::new(
upstream_addr,
proxy_config.upstream_authority_pubkey,
rx_sv2_submit_shares_ext,
tx_sv2_set_new_prev_hash,
tx_sv2_new_ext_mining_job,
proxy_config.min_extranonce2_size,
tx_sv2_extranonce,
status::Sender::Upstream(tx_status.clone()),
target.clone(),
diff_config.clone(),
task_collector_upstream,
)
.await
{
Ok(upstream) => upstream,
Err(e) => {
error!("Failed to create upstream: {}", e);
return;
}
};
let task_collector_init_task = task_collector.clone();
// Spawn a task to do all of this init work so that the main thread
// can listen for signals and failures on the status channel. This
// allows for the tproxy to fail gracefully if any of these init tasks
//fail
let task = task::spawn(async move {
// Connect to the SV2 Upstream role
match upstream_sv2::Upstream::connect(
upstream.clone(),
proxy_config.min_supported_version,
proxy_config.max_supported_version,
)
.await
{
Ok(_) => info!("Connected to Upstream!"),
Err(e) => {
error!("Failed to connect to Upstream EXITING! : {}", e);
return;
}
}

// Start receiving messages from the SV2 Upstream role
if let Err(e) = upstream_sv2::Upstream::parse_incoming(upstream.clone()) {
error!("failed to create sv2 parser: {}", e);
return;
}

debug!("Finished starting upstream listener");
// Start task handler to receive submits from the SV1 Downstream role once it connects
if let Err(e) = upstream_sv2::Upstream::handle_submit(upstream.clone()) {
error!("Failed to create submit handler: {}", e);
return;
}

// Receive the extranonce information from the Upstream role to send to the Downstream role
// once it connects also used to initialize the bridge
let (extended_extranonce, up_id) = rx_sv2_extranonce.recv().await.unwrap();
loop {
let target: [u8; 32] = target.safe_lock(|t| t.clone()).unwrap().try_into().unwrap();
if target != [0; 32] {
break;
};
async_std::task::sleep(std::time::Duration::from_millis(100)).await;
}

let task_collector_bridge = task_collector_init_task.clone();
// Instantiate a new `Bridge` and begins handling incoming messages
let b = proxy::Bridge::new(
rx_sv1_downstream,
tx_sv2_submit_shares_ext,
rx_sv2_set_new_prev_hash,
rx_sv2_new_ext_mining_job,
tx_sv1_notify.clone(),
status::Sender::Bridge(tx_status.clone()),
extended_extranonce,
target,
up_id,
task_collector_bridge,
);
proxy::Bridge::start(b.clone());

// Format `Downstream` connection address
let downstream_addr = SocketAddr::new(
IpAddr::from_str(&proxy_config.downstream_address).unwrap(),
proxy_config.downstream_port,
);

let task_collector_downstream = task_collector_init_task.clone();
// Accept connections from one or more SV1 Downstream roles (SV1 Mining Devices)
downstream_sv1::Downstream::accept_connections(
downstream_addr,
tx_sv1_bridge,
tx_sv1_notify,
status::Sender::DownstreamListener(tx_status.clone()),
b,
proxy_config.downstream_difficulty_config,
diff_config,
task_collector_downstream,
);
}); // End of init task
let _ =
task_collector.safe_lock(|t| t.push((task.abort_handle(), "init task".to_string())));
}
}

fn kill_tasks(task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>) {
let _ = task_collector.safe_lock(|t| {
while let Some(handle) = t.pop() {
handle.0.abort();
warn!("Killed task: {:?}", handle.1);
}
});
}
67 changes: 67 additions & 0 deletions roles/translator/src/lib/proxy_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,42 @@ pub struct ProxyConfig {
pub upstream_difficulty_config: UpstreamDifficultyConfig,
}

pub struct TranslatorProxyUpstream {
address: String,
port: u16,
authority_pubkey: Secp256k1PublicKey,
difficulty_config: UpstreamDifficultyConfig,
}

pub struct TranslatorProxyDownstream {
address: String,
port: u16,
difficulty_config: DownstreamDifficultyConfig,
}

impl ProxyConfig {
pub fn new(
upstream: TranslatorProxyUpstream,
downstream: TranslatorProxyDownstream,
max_supported_version: u16,
min_supported_version: u16,
min_extranonce2_size: u16,
) -> Self {
Self {
upstream_address: upstream.address,
upstream_port: upstream.port,
upstream_authority_pubkey: upstream.authority_pubkey,
downstream_address: downstream.address,
downstream_port: downstream.port,
max_supported_version,
min_supported_version,
min_extranonce2_size,
downstream_difficulty_config: downstream.difficulty_config,
upstream_difficulty_config: upstream.difficulty_config,
}
}
}

#[derive(Debug, Deserialize, Clone)]
pub struct DownstreamDifficultyConfig {
pub min_individual_miner_hashrate: f32,
Expand All @@ -25,6 +61,21 @@ pub struct DownstreamDifficultyConfig {
pub timestamp_of_last_update: u64,
}

impl DownstreamDifficultyConfig {
pub fn new(
min_individual_miner_hashrate: f32,
shares_per_minute: f32,
submits_since_last_update: u32,
timestamp_of_last_update: u64,
) -> Self {
Self {
min_individual_miner_hashrate,
shares_per_minute,
submits_since_last_update,
timestamp_of_last_update,
}
}
}
impl PartialEq for DownstreamDifficultyConfig {
fn eq(&self, other: &Self) -> bool {
other.min_individual_miner_hashrate.round() as u32
Expand All @@ -41,3 +92,19 @@ pub struct UpstreamDifficultyConfig {
#[serde(default = "bool::default")]
pub should_aggregate: bool,
}

impl UpstreamDifficultyConfig {
pub fn new(
channel_diff_update_interval: u32,
channel_nominal_hashrate: f32,
timestamp_of_last_update: u64,
should_aggregate: bool,
) -> Self {
Self {
channel_diff_update_interval,
channel_nominal_hashrate,
timestamp_of_last_update,
should_aggregate,
}
}
}
Loading
Loading