Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replaces PR #727 #733

Merged
merged 32 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1b9c51d
Merge pull request #11 from stratum-mining/dev
GitGab19 Dec 18, 2023
023984b
Merge branch 'stratum-mining:dev' into dev
GitGab19 Jan 10, 2024
9cec29c
Merge branch 'stratum-mining:dev' into dev
GitGab19 Jan 12, 2024
3dcd85d
automatic diff management enhancement + fix #712
GitGab19 Jan 22, 2024
2fabdfe
invalid-job-id error added
GitGab19 Jan 22, 2024
08c49fb
fix
GitGab19 Jan 22, 2024
764ae99
'invalid-job-id' added into SubmitShares.Error doc
GitGab19 Jan 23, 2024
4100000
Fix issue #730
GitGab19 Jan 23, 2024
9251f0e
Mempool update timout management
GitGab19 Jan 23, 2024
6029ad7
fmt fix
GitGab19 Jan 23, 2024
88c7c41
clippy fix
GitGab19 Jan 23, 2024
12b3e05
MG fixes
GitGab19 Jan 23, 2024
e67a00e
panic if TP is not responding
GitGab19 Jan 23, 2024
bdd1064
fmt fixes
GitGab19 Jan 23, 2024
2519367
clippy fix
GitGab19 Jan 23, 2024
583d23a
last changes
GitGab19 Jan 24, 2024
8db76f8
automatic diff management enhancement + fix #712
GitGab19 Jan 22, 2024
798b842
invalid-job-id error added
GitGab19 Jan 22, 2024
d94fa27
fix
GitGab19 Jan 22, 2024
90158fa
'invalid-job-id' added into SubmitShares.Error doc
GitGab19 Jan 23, 2024
6362640
Fix issue #730
GitGab19 Jan 23, 2024
c365c88
Mempool update timout management
GitGab19 Jan 23, 2024
2c70690
fmt fix
GitGab19 Jan 23, 2024
180db6d
clippy fix
GitGab19 Jan 23, 2024
a14c364
MG fixes
GitGab19 Jan 23, 2024
788481b
panic if TP is not responding
GitGab19 Jan 23, 2024
78f5208
fmt fixes
GitGab19 Jan 23, 2024
e13d166
clippy fix
GitGab19 Jan 23, 2024
4b5f3df
last changes
GitGab19 Jan 24, 2024
b4f62d3
Merge branch 'dev' of https://github.com/GitGab19/stratum into dev
GitGab19 Jan 25, 2024
b1f815b
last fixes
GitGab19 Jan 25, 2024
22f43fe
last changes
GitGab19 Jan 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions protocols/v2/roles-logic-sv2/src/channel_logic/channel_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,21 @@ impl ProxyExtendedChannelFactory {
.clone()
.ok_or(Error::ShareDoNotMatchAnyJob)?
.0;

if referenced_job.job_id != m.job_id {
let error = SubmitSharesError {
channel_id: m.channel_id,
sequence_number: m.sequence_number,
// Infallible unwrap we already know the len of the error code (is a
// static string)
error_code: SubmitSharesError::invalid_job_id_error_code()
.to_string()
.try_into()
.unwrap(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment that exaplain why the unwrap is safe

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

Going forward, IMO we should try to keep some discipline around the usage of unwrap, because we are getting some criticism from the community.

for example: #723 (comment)

};
return Ok(OnNewShare::SendErrorDownstream(error));
}

if let Some(job_creator) = self.job_creator.as_mut() {
let template_id = job_creator
.get_template_id_from_job(referenced_job.job_id)
Expand Down
4 changes: 4 additions & 0 deletions protocols/v2/subprotocols/mining/src/submit_shares.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub struct SubmitSharesSuccess {
/// * ‘invalid-channel-id’
/// * ‘stale-share’
/// * ‘difficulty-too-low’
/// * 'invalid-job-id'
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SubmitSharesError<'decoder> {
pub channel_id: u32,
Expand All @@ -109,6 +110,9 @@ impl<'a> SubmitSharesError<'a> {
pub fn difficulty_too_low_error_code() -> &'static str {
"difficulty-too-low"
}
pub fn invalid_job_id_error_code() -> &'static str {
"invalid-job-id"
}
}
#[cfg(feature = "with_serde")]
use binary_sv2::GetSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ core_rpc_url = "http://127.0.0.1"
core_rpc_port = 18332
core_rpc_user = "username"
core_rpc_pass = "password"
# Timeout used for JDS mempool update
[mempool_update_timeout]
unit = "secs"
value = 1
4 changes: 4 additions & 0 deletions roles/jd-server/config-examples/jds-config-local-example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ core_rpc_url = "http://127.0.0.1"
core_rpc_port = 18332
core_rpc_user = "username"
core_rpc_pass = "password"
# Timeout used for JDS mempool update
[mempool_update_timeout]
unit = "secs"
value = 1
10 changes: 10 additions & 0 deletions roles/jd-server/src/lib/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::{

use roles_logic_sv2::parsers::Mining;

use crate::mempool::JdsMempoolError;

#[derive(std::fmt::Debug)]
pub enum JdsError {
Io(std::io::Error),
Expand All @@ -19,6 +21,7 @@ pub enum JdsError {
PoisonLock(String),
Custom(String),
Sv2ProtocolError((u32, Mining<'static>)),
MempoolError(JdsMempoolError),
}

impl std::fmt::Display for JdsError {
Expand All @@ -38,6 +41,7 @@ impl std::fmt::Display for JdsError {
Sv2ProtocolError(ref e) => {
write!(f, "Received Sv2 Protocol Error from upstream: `{:?}`", e)
}
MempoolError(ref e) => write!(f, "Mempool error: `{:?}`", e),
}
}
}
Expand Down Expand Up @@ -106,3 +110,9 @@ impl From<(u32, Mining<'static>)> for JdsError {
JdsError::Sv2ProtocolError(e)
}
}

impl From<JdsMempoolError> for JdsError {
fn from(error: JdsMempoolError) -> Self {
JdsError::MempoolError(error)
}
}
13 changes: 12 additions & 1 deletion roles/jd-server/src/lib/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use serde::{Deserialize, Serialize};
use std::{convert::TryInto, sync::Arc};
use stratum_common::{bitcoin, bitcoin::hash_types::Txid};

use self::rpc_client::BitcoincoreRpcError;

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Hash([u8; 32]);

Expand Down Expand Up @@ -57,7 +59,9 @@ impl JDsMempool {
.ok_or(JdsMempoolError::NoClient)?;
let new_mempool: Result<Vec<TransacrtionWithHash>, JdsMempoolError> =
tokio::task::spawn(async move {
let mempool: Vec<String> = client.get_raw_mempool_verbose().unwrap();
let mempool: Result<Vec<String>, BitcoincoreRpcError> =
client.get_raw_mempool_verbose();
let mempool = mempool.map_err(JdsMempoolError::BitcoinCoreRpcError)?;
for id in &mempool {
let tx: Result<Transaction, _> = client.get_raw_transaction(id, None);
if let Ok(tx) = tx {
Expand Down Expand Up @@ -106,4 +110,11 @@ impl JDsMempool {
pub enum JdsMempoolError {
EmptyMempool,
NoClient,
BitcoinCoreRpcError(BitcoincoreRpcError),
}

impl From<BitcoincoreRpcError> for JdsMempoolError {
fn from(error: BitcoincoreRpcError) -> Self {
JdsMempoolError::BitcoinCoreRpcError(error)
}
}
36 changes: 35 additions & 1 deletion roles/jd-server/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use roles_logic_sv2::{
errors::Error, parsers::PoolMessages as JdsMessages, utils::CoinbaseOutput as CoinbaseOutput_,
};
use serde::Deserialize;
use std::convert::{TryFrom, TryInto};
use std::{
convert::{TryFrom, TryInto},
time::Duration,
};
use stratum_common::bitcoin::{Script, TxOut};

pub type Message = JdsMessages<'static>;
Expand Down Expand Up @@ -63,4 +66,35 @@ pub struct Configuration {
pub core_rpc_port: u16,
pub core_rpc_user: String,
pub core_rpc_pass: String,
#[serde(deserialize_with = "duration_from_toml")]
pub mempool_update_timeout: Duration,
}

fn duration_from_toml<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
struct Helper {
unit: String,
value: u64,
}

let helper = Helper::deserialize(deserializer)?;
match helper.unit.as_str() {
"seconds" => Ok(Duration::from_secs(helper.value)),
"secs" => Ok(Duration::from_secs(helper.value)),
"s" => Ok(Duration::from_secs(helper.value)),
"milliseconds" => Ok(Duration::from_millis(helper.value)),
"millis" => Ok(Duration::from_millis(helper.value)),
"ms" => Ok(Duration::from_millis(helper.value)),
"microseconds" => Ok(Duration::from_micros(helper.value)),
"micros" => Ok(Duration::from_micros(helper.value)),
"us" => Ok(Duration::from_micros(helper.value)),
"nanoseconds" => Ok(Duration::from_nanos(helper.value)),
"nanos" => Ok(Duration::from_nanos(helper.value)),
"ns" => Ok(Duration::from_nanos(helper.value)),
// ... add other units as needed
_ => Err(serde::de::Error::custom("Unsupported duration unit")),
}
}
10 changes: 10 additions & 0 deletions roles/jd-server/src/lib/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ async fn send_status(
.await
.unwrap_or(());
}
JdsError::MempoolError(_) => {
tx.send(Status {
state: State::TemplateProviderShutdown(e),
})
.await
.unwrap_or(());
}
_ => {
let string_err = e.to_string();
tx.send(Status {
Expand Down Expand Up @@ -111,5 +118,8 @@ pub async fn handle_error(sender: &Sender, e: JdsError) -> error_handling::Error
JdsError::Sv2ProtocolError(_) => {
send_status(sender, e, error_handling::ErrorBranch::Break).await
}
JdsError::MempoolError(_) => {
send_status(sender, e, error_handling::ErrorBranch::Break).await
}
}
}
64 changes: 58 additions & 6 deletions roles/jd-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,52 @@
#![allow(special_module_name)]
/* #![allow(special_module_name)]
use crate::lib::{mempool, status, Configuration};
use async_channel::unbounded;
use codec_sv2::{StandardEitherFrame, StandardSv2Frame};
use key_utils::{Secp256k1PublicKey, Secp256k1SecretKey};
use roles_logic_sv2::{
errors::Error, parsers::PoolMessages as JdsMessages, utils::CoinbaseOutput as CoinbaseOutput_,
};
use serde::Deserialize;
use std::convert::{TryFrom, TryInto};

use error_handling::handle_result;
use stratum_common::bitcoin::{Script, TxOut};
use tracing::{error, info, warn};

//use crate::lib::mempool;
use roles_logic_sv2::utils::Mutex;
use std::{sync::Arc, time::Duration};
use tokio::{select, task};

use crate::{lib::job_declarator::JobDeclarator, status::Status};

#[derive(Debug, Deserialize, Clone)]
pub struct CoinbaseOutput {
output_script_type: String,
output_script_value: String,
}

#[derive(Debug, Deserialize, Clone)]
pub struct Configuration {
pub listen_jd_address: String,
pub authority_public_key: Secp256k1PublicKey,
pub authority_secret_key: Secp256k1SecretKey,
pub cert_validity_sec: u64,
pub coinbase_outputs: Vec<CoinbaseOutput>,
pub core_rpc_url: String,
pub core_rpc_port: u16,
pub core_rpc_user: String,
pub core_rpc_pass: String,
#[serde(deserialize_with = "duration_from_toml")]
pub mempool_update_timeout: Duration,
} */
#![allow(special_module_name)]
use crate::lib::{mempool, status, Configuration};
use async_channel::unbounded;
use error_handling::handle_result;
use roles_logic_sv2::utils::Mutex;
use std::sync::Arc;
use tokio::{select, task};
use tracing::{error, info, warn};
mod lib;

Expand Down Expand Up @@ -106,22 +149,31 @@ async fn main() {
username,
password,
)));
let mempool_update_timeout = config.mempool_update_timeout;
let mempool_cloned_ = mempool.clone();
let (status_tx, status_rx) = unbounded();
let sender = status::Sender::Downstream(status_tx.clone());
if url.contains("http") {
let sender_clone = sender.clone();
task::spawn(async move {
loop {
let _ = mempool::JDsMempool::update_mempool(mempool_cloned_.clone()).await;
// TODO this should be configurable by the user
tokio::time::sleep(Duration::from_millis(10000)).await;
let updated_mempool =
mempool::JDsMempool::update_mempool(mempool_cloned_.clone()).await;
if let Err(err) = updated_mempool {
error!("{:?}", err);
error!("Unable to connect to Template Provider (possible reasons: not fully synced, down)");
handle_result!(sender_clone, Err(err));
}
tokio::time::sleep(mempool_update_timeout).await;
}
});
};

let (status_tx, status_rx) = unbounded();
//let (status_tx, status_rx) = unbounded();
info!("Jds INITIALIZING with config: {:?}", &args.config_path);

let cloned = config.clone();
let sender = status::Sender::Downstream(status_tx.clone());

let mempool_cloned = mempool.clone();
task::spawn(async move { JobDeclarator::start(cloned, sender, mempool_cloned).await });

Expand Down
29 changes: 21 additions & 8 deletions roles/translator/src/lib/downstream_sv1/diff_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use super::{Downstream, DownstreamMessages, SetDownstreamTarget};
use super::super::error::{Error, ProxyResult};
use roles_logic_sv2::utils::Mutex;
use std::{ops::Div, sync::Arc};
use tracing::error;
use v1::json_rpc;

use stratum_common::bitcoin::util::uint::Uint256;
Expand Down Expand Up @@ -212,7 +211,7 @@ impl Downstream {
}

let delta_time = timestamp_secs - d.difficulty_mgmt.timestamp_of_last_update;
if delta_time == 0 {
if delta_time <= 15 {
return Ok(None);
}
tracing::debug!("\nDELTA TIME: {:?}", delta_time);
Expand All @@ -225,7 +224,7 @@ impl Downstream {
) {
Ok(hashrate) => hashrate as f32,
Err(e) => {
error!("{:?} -> Probably min_individual_miner_hashrate parameter was not set properly in config file. New hashrate will be automatically adjusted to match the real one.", e);
tracing::debug!("{:?} -> Probably min_individual_miner_hashrate parameter was not set properly in config file. New hashrate will be automatically adjusted to match the real one.", e);
d.difficulty_mgmt.min_individual_miner_hashrate * realized_share_per_min as f32 / d.difficulty_mgmt.shares_per_minute
}
};
Expand All @@ -244,21 +243,35 @@ impl Downstream {
|| (hashrate_delta_percentage >= 30.0) && (delta_time >= 240)
|| (hashrate_delta_percentage >= 15.0) && (delta_time >= 300)
{
if realized_share_per_min < 0.01 {
// realized_share_per_min is 0.0 when d.difficulty_mgmt.submits_since_last_update is 0
// so it's safe to compare realized_share_per_min with == 0.0
if realized_share_per_min == 0.0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a float are you sure that is safe to change from < to ==

If yes write a comment that explain why

new_miner_hashrate = match delta_time {
dt if dt < 30 => d.difficulty_mgmt.min_individual_miner_hashrate / 2.0,
dt if dt < 60 => d.difficulty_mgmt.min_individual_miner_hashrate / 3.0,
_ => d.difficulty_mgmt.min_individual_miner_hashrate / 5.0,
dt if dt <= 30 => d.difficulty_mgmt.min_individual_miner_hashrate / 1.5,
dt if dt < 60 => d.difficulty_mgmt.min_individual_miner_hashrate / 2.0,
_ => d.difficulty_mgmt.min_individual_miner_hashrate / 3.0,
};
hashrate_delta =
new_miner_hashrate - d.difficulty_mgmt.min_individual_miner_hashrate;
}
if (realized_share_per_min > 0.0) && (hashrate_delta_percentage > 1000.0) {
new_miner_hashrate = match delta_time {
dt if dt <= 30 => d.difficulty_mgmt.min_individual_miner_hashrate * 10.0,
dt if dt < 60 => d.difficulty_mgmt.min_individual_miner_hashrate * 5.0,
_ => d.difficulty_mgmt.min_individual_miner_hashrate * 3.0,
};
hashrate_delta =
new_miner_hashrate - d.difficulty_mgmt.min_individual_miner_hashrate;
}
d.difficulty_mgmt.min_individual_miner_hashrate = new_miner_hashrate;
d.difficulty_mgmt.timestamp_of_last_update = timestamp_secs;
d.difficulty_mgmt.submits_since_last_update = 0;
// update channel hashrate (read by upstream)
d.upstream_difficulty_config.super_safe_lock(|c| {
if c.channel_nominal_hashrate + hashrate_delta > 0.0 {
c.channel_nominal_hashrate += hashrate_delta;
} else {
c.channel_nominal_hashrate = 0.0;
}
});
Ok(Some(new_miner_hashrate))
} else {
Expand Down
1 change: 0 additions & 1 deletion roles/translator/src/lib/proxy/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ impl Bridge {
"Submit share error {:?}",
std::str::from_utf8(&e.error_code.to_vec()[..])
);
error!("Make sure to set `min_individual_miner_hashrate` in the config file");
}
Ok(Ok(OnNewShare::SendSubmitShareUpstream((share, _)))) => {
info!("SHARE MEETS UPSTREAM TARGET");
Expand Down
4 changes: 4 additions & 0 deletions test/config/interop-jd-change-upstream/jds-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ core_rpc_url = ""
core_rpc_port = 18332
core_rpc_user = ""
core_rpc_pass = ""
# Timeout used for JDS mempool update
[mempool_update_timeout]
unit = "secs"
value = 1
4 changes: 4 additions & 0 deletions test/config/interop-jd-translator/jds-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ core_rpc_url = ""
core_rpc_port = 18332
core_rpc_user = ""
core_rpc_pass = ""
# Timeout used for JDS mempool update
[mempool_update_timeout]
unit = "secs"
value = 1
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ core_rpc_url = ""
core_rpc_port = 18332
core_rpc_user = ""
core_rpc_pass = ""
# Timeout used for JDS mempool update
[mempool_update_timeout]
unit = "secs"
value = 1
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ core_rpc_url = ""
core_rpc_port = 18332
core_rpc_user = ""
core_rpc_pass = ""
# Timeout used for JDS mempool update
[mempool_update_timeout]
unit = "secs"
value = 1
Loading