Skip to content

Commit

Permalink
Merge pull request #733 from GitGab19/dev
Browse files Browse the repository at this point in the history
Replaces PR #727
  • Loading branch information
Fi3 committed Jan 26, 2024
2 parents ddd51ec + 22f43fe commit cd3ab95
Show file tree
Hide file tree
Showing 15 changed files with 189 additions and 17 deletions.
15 changes: 15 additions & 0 deletions protocols/v2/roles-logic-sv2/src/channel_logic/channel_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,21 @@ impl ProxyExtendedChannelFactory {
.clone()
.ok_or(Error::ShareDoNotMatchAnyJob)?
.0;

if referenced_job.job_id != m.job_id {
let error = SubmitSharesError {
channel_id: m.channel_id,
sequence_number: m.sequence_number,
// Infallible unwrap we already know the len of the error code (is a
// static string)
error_code: SubmitSharesError::invalid_job_id_error_code()
.to_string()
.try_into()
.unwrap(),
};
return Ok(OnNewShare::SendErrorDownstream(error));
}

if let Some(job_creator) = self.job_creator.as_mut() {
let template_id = job_creator
.get_template_id_from_job(referenced_job.job_id)
Expand Down
4 changes: 4 additions & 0 deletions protocols/v2/subprotocols/mining/src/submit_shares.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub struct SubmitSharesSuccess {
/// * ‘invalid-channel-id’
/// * ‘stale-share’
/// * ‘difficulty-too-low’
/// * 'invalid-job-id'
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SubmitSharesError<'decoder> {
pub channel_id: u32,
Expand All @@ -109,6 +110,9 @@ impl<'a> SubmitSharesError<'a> {
pub fn difficulty_too_low_error_code() -> &'static str {
"difficulty-too-low"
}
pub fn invalid_job_id_error_code() -> &'static str {
"invalid-job-id"
}
}
#[cfg(feature = "with_serde")]
use binary_sv2::GetSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ core_rpc_url = "http://127.0.0.1"
core_rpc_port = 18332
core_rpc_user = "username"
core_rpc_pass = "password"
# Timeout used for JDS mempool update
[mempool_update_timeout]
unit = "secs"
value = 1
4 changes: 4 additions & 0 deletions roles/jd-server/config-examples/jds-config-local-example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ core_rpc_url = "http://127.0.0.1"
core_rpc_port = 18332
core_rpc_user = "username"
core_rpc_pass = "password"
# Timeout used for JDS mempool update
[mempool_update_timeout]
unit = "secs"
value = 1
10 changes: 10 additions & 0 deletions roles/jd-server/src/lib/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::{

use roles_logic_sv2::parsers::Mining;

use crate::mempool::JdsMempoolError;

#[derive(std::fmt::Debug)]
pub enum JdsError {
Io(std::io::Error),
Expand All @@ -19,6 +21,7 @@ pub enum JdsError {
PoisonLock(String),
Custom(String),
Sv2ProtocolError((u32, Mining<'static>)),
MempoolError(JdsMempoolError),
}

impl std::fmt::Display for JdsError {
Expand All @@ -38,6 +41,7 @@ impl std::fmt::Display for JdsError {
Sv2ProtocolError(ref e) => {
write!(f, "Received Sv2 Protocol Error from upstream: `{:?}`", e)
}
MempoolError(ref e) => write!(f, "Mempool error: `{:?}`", e),
}
}
}
Expand Down Expand Up @@ -106,3 +110,9 @@ impl From<(u32, Mining<'static>)> for JdsError {
JdsError::Sv2ProtocolError(e)
}
}

impl From<JdsMempoolError> for JdsError {
fn from(error: JdsMempoolError) -> Self {
JdsError::MempoolError(error)
}
}
13 changes: 12 additions & 1 deletion roles/jd-server/src/lib/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use serde::{Deserialize, Serialize};
use std::{convert::TryInto, sync::Arc};
use stratum_common::{bitcoin, bitcoin::hash_types::Txid};

use self::rpc_client::BitcoincoreRpcError;

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Hash([u8; 32]);

Expand Down Expand Up @@ -57,7 +59,9 @@ impl JDsMempool {
.ok_or(JdsMempoolError::NoClient)?;
let new_mempool: Result<Vec<TransacrtionWithHash>, JdsMempoolError> =
tokio::task::spawn(async move {
let mempool: Vec<String> = client.get_raw_mempool_verbose().unwrap();
let mempool: Result<Vec<String>, BitcoincoreRpcError> =
client.get_raw_mempool_verbose();
let mempool = mempool.map_err(JdsMempoolError::BitcoinCoreRpcError)?;
for id in &mempool {
let tx: Result<Transaction, _> = client.get_raw_transaction(id, None);
if let Ok(tx) = tx {
Expand Down Expand Up @@ -106,4 +110,11 @@ impl JDsMempool {
pub enum JdsMempoolError {
EmptyMempool,
NoClient,
BitcoinCoreRpcError(BitcoincoreRpcError),
}

impl From<BitcoincoreRpcError> for JdsMempoolError {
fn from(error: BitcoincoreRpcError) -> Self {
JdsMempoolError::BitcoinCoreRpcError(error)
}
}
36 changes: 35 additions & 1 deletion roles/jd-server/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use roles_logic_sv2::{
errors::Error, parsers::PoolMessages as JdsMessages, utils::CoinbaseOutput as CoinbaseOutput_,
};
use serde::Deserialize;
use std::convert::{TryFrom, TryInto};
use std::{
convert::{TryFrom, TryInto},
time::Duration,
};
use stratum_common::bitcoin::{Script, TxOut};

pub type Message = JdsMessages<'static>;
Expand Down Expand Up @@ -63,4 +66,35 @@ pub struct Configuration {
pub core_rpc_port: u16,
pub core_rpc_user: String,
pub core_rpc_pass: String,
#[serde(deserialize_with = "duration_from_toml")]
pub mempool_update_timeout: Duration,
}

fn duration_from_toml<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
struct Helper {
unit: String,
value: u64,
}

let helper = Helper::deserialize(deserializer)?;
match helper.unit.as_str() {
"seconds" => Ok(Duration::from_secs(helper.value)),
"secs" => Ok(Duration::from_secs(helper.value)),
"s" => Ok(Duration::from_secs(helper.value)),
"milliseconds" => Ok(Duration::from_millis(helper.value)),
"millis" => Ok(Duration::from_millis(helper.value)),
"ms" => Ok(Duration::from_millis(helper.value)),
"microseconds" => Ok(Duration::from_micros(helper.value)),
"micros" => Ok(Duration::from_micros(helper.value)),
"us" => Ok(Duration::from_micros(helper.value)),
"nanoseconds" => Ok(Duration::from_nanos(helper.value)),
"nanos" => Ok(Duration::from_nanos(helper.value)),
"ns" => Ok(Duration::from_nanos(helper.value)),
// ... add other units as needed
_ => Err(serde::de::Error::custom("Unsupported duration unit")),
}
}
10 changes: 10 additions & 0 deletions roles/jd-server/src/lib/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ async fn send_status(
.await
.unwrap_or(());
}
JdsError::MempoolError(_) => {
tx.send(Status {
state: State::TemplateProviderShutdown(e),
})
.await
.unwrap_or(());
}
_ => {
let string_err = e.to_string();
tx.send(Status {
Expand Down Expand Up @@ -111,5 +118,8 @@ pub async fn handle_error(sender: &Sender, e: JdsError) -> error_handling::Error
JdsError::Sv2ProtocolError(_) => {
send_status(sender, e, error_handling::ErrorBranch::Break).await
}
JdsError::MempoolError(_) => {
send_status(sender, e, error_handling::ErrorBranch::Break).await
}
}
}
64 changes: 58 additions & 6 deletions roles/jd-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,52 @@
#![allow(special_module_name)]
/* #![allow(special_module_name)]
use crate::lib::{mempool, status, Configuration};
use async_channel::unbounded;
use codec_sv2::{StandardEitherFrame, StandardSv2Frame};
use key_utils::{Secp256k1PublicKey, Secp256k1SecretKey};
use roles_logic_sv2::{
errors::Error, parsers::PoolMessages as JdsMessages, utils::CoinbaseOutput as CoinbaseOutput_,
};
use serde::Deserialize;
use std::convert::{TryFrom, TryInto};
use error_handling::handle_result;
use stratum_common::bitcoin::{Script, TxOut};
use tracing::{error, info, warn};
//use crate::lib::mempool;
use roles_logic_sv2::utils::Mutex;
use std::{sync::Arc, time::Duration};
use tokio::{select, task};
use crate::{lib::job_declarator::JobDeclarator, status::Status};
#[derive(Debug, Deserialize, Clone)]
pub struct CoinbaseOutput {
output_script_type: String,
output_script_value: String,
}
#[derive(Debug, Deserialize, Clone)]
pub struct Configuration {
pub listen_jd_address: String,
pub authority_public_key: Secp256k1PublicKey,
pub authority_secret_key: Secp256k1SecretKey,
pub cert_validity_sec: u64,
pub coinbase_outputs: Vec<CoinbaseOutput>,
pub core_rpc_url: String,
pub core_rpc_port: u16,
pub core_rpc_user: String,
pub core_rpc_pass: String,
#[serde(deserialize_with = "duration_from_toml")]
pub mempool_update_timeout: Duration,
} */
#![allow(special_module_name)]
use crate::lib::{mempool, status, Configuration};
use async_channel::unbounded;
use error_handling::handle_result;
use roles_logic_sv2::utils::Mutex;
use std::sync::Arc;
use tokio::{select, task};
use tracing::{error, info, warn};
mod lib;

Expand Down Expand Up @@ -106,22 +149,31 @@ async fn main() {
username,
password,
)));
let mempool_update_timeout = config.mempool_update_timeout;
let mempool_cloned_ = mempool.clone();
let (status_tx, status_rx) = unbounded();
let sender = status::Sender::Downstream(status_tx.clone());
if url.contains("http") {
let sender_clone = sender.clone();
task::spawn(async move {
loop {
let _ = mempool::JDsMempool::update_mempool(mempool_cloned_.clone()).await;
// TODO this should be configurable by the user
tokio::time::sleep(Duration::from_millis(10000)).await;
let updated_mempool =
mempool::JDsMempool::update_mempool(mempool_cloned_.clone()).await;
if let Err(err) = updated_mempool {
error!("{:?}", err);
error!("Unable to connect to Template Provider (possible reasons: not fully synced, down)");
handle_result!(sender_clone, Err(err));
}
tokio::time::sleep(mempool_update_timeout).await;
}
});
};

let (status_tx, status_rx) = unbounded();
//let (status_tx, status_rx) = unbounded();
info!("Jds INITIALIZING with config: {:?}", &args.config_path);

let cloned = config.clone();
let sender = status::Sender::Downstream(status_tx.clone());

let mempool_cloned = mempool.clone();
task::spawn(async move { JobDeclarator::start(cloned, sender, mempool_cloned).await });

Expand Down
29 changes: 21 additions & 8 deletions roles/translator/src/lib/downstream_sv1/diff_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use super::{Downstream, DownstreamMessages, SetDownstreamTarget};
use super::super::error::{Error, ProxyResult};
use roles_logic_sv2::utils::Mutex;
use std::{ops::Div, sync::Arc};
use tracing::error;
use v1::json_rpc;

use stratum_common::bitcoin::util::uint::Uint256;
Expand Down Expand Up @@ -212,7 +211,7 @@ impl Downstream {
}

let delta_time = timestamp_secs - d.difficulty_mgmt.timestamp_of_last_update;
if delta_time == 0 {
if delta_time <= 15 {
return Ok(None);
}
tracing::debug!("\nDELTA TIME: {:?}", delta_time);
Expand All @@ -225,7 +224,7 @@ impl Downstream {
) {
Ok(hashrate) => hashrate as f32,
Err(e) => {
error!("{:?} -> Probably min_individual_miner_hashrate parameter was not set properly in config file. New hashrate will be automatically adjusted to match the real one.", e);
tracing::debug!("{:?} -> Probably min_individual_miner_hashrate parameter was not set properly in config file. New hashrate will be automatically adjusted to match the real one.", e);
d.difficulty_mgmt.min_individual_miner_hashrate * realized_share_per_min as f32 / d.difficulty_mgmt.shares_per_minute
}
};
Expand All @@ -244,21 +243,35 @@ impl Downstream {
|| (hashrate_delta_percentage >= 30.0) && (delta_time >= 240)
|| (hashrate_delta_percentage >= 15.0) && (delta_time >= 300)
{
if realized_share_per_min < 0.01 {
// realized_share_per_min is 0.0 when d.difficulty_mgmt.submits_since_last_update is 0
// so it's safe to compare realized_share_per_min with == 0.0
if realized_share_per_min == 0.0 {
new_miner_hashrate = match delta_time {
dt if dt < 30 => d.difficulty_mgmt.min_individual_miner_hashrate / 2.0,
dt if dt < 60 => d.difficulty_mgmt.min_individual_miner_hashrate / 3.0,
_ => d.difficulty_mgmt.min_individual_miner_hashrate / 5.0,
dt if dt <= 30 => d.difficulty_mgmt.min_individual_miner_hashrate / 1.5,
dt if dt < 60 => d.difficulty_mgmt.min_individual_miner_hashrate / 2.0,
_ => d.difficulty_mgmt.min_individual_miner_hashrate / 3.0,
};
hashrate_delta =
new_miner_hashrate - d.difficulty_mgmt.min_individual_miner_hashrate;
}
if (realized_share_per_min > 0.0) && (hashrate_delta_percentage > 1000.0) {
new_miner_hashrate = match delta_time {
dt if dt <= 30 => d.difficulty_mgmt.min_individual_miner_hashrate * 10.0,
dt if dt < 60 => d.difficulty_mgmt.min_individual_miner_hashrate * 5.0,
_ => d.difficulty_mgmt.min_individual_miner_hashrate * 3.0,
};
hashrate_delta =
new_miner_hashrate - d.difficulty_mgmt.min_individual_miner_hashrate;
}
d.difficulty_mgmt.min_individual_miner_hashrate = new_miner_hashrate;
d.difficulty_mgmt.timestamp_of_last_update = timestamp_secs;
d.difficulty_mgmt.submits_since_last_update = 0;
// update channel hashrate (read by upstream)
d.upstream_difficulty_config.super_safe_lock(|c| {
if c.channel_nominal_hashrate + hashrate_delta > 0.0 {
c.channel_nominal_hashrate += hashrate_delta;
} else {
c.channel_nominal_hashrate = 0.0;
}
});
Ok(Some(new_miner_hashrate))
} else {
Expand Down
1 change: 0 additions & 1 deletion roles/translator/src/lib/proxy/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ impl Bridge {
"Submit share error {:?}",
std::str::from_utf8(&e.error_code.to_vec()[..])
);
error!("Make sure to set `min_individual_miner_hashrate` in the config file");
}
Ok(Ok(OnNewShare::SendSubmitShareUpstream((share, _)))) => {
info!("SHARE MEETS UPSTREAM TARGET");
Expand Down
4 changes: 4 additions & 0 deletions test/config/interop-jd-change-upstream/jds-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ core_rpc_url = ""
core_rpc_port = 18332
core_rpc_user = ""
core_rpc_pass = ""
# Timeout used for JDS mempool update
[mempool_update_timeout]
unit = "secs"
value = 1
4 changes: 4 additions & 0 deletions test/config/interop-jd-translator/jds-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ core_rpc_url = ""
core_rpc_port = 18332
core_rpc_user = ""
core_rpc_pass = ""
# Timeout used for JDS mempool update
[mempool_update_timeout]
unit = "secs"
value = 1
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ core_rpc_url = ""
core_rpc_port = 18332
core_rpc_user = ""
core_rpc_pass = ""
# Timeout used for JDS mempool update
[mempool_update_timeout]
unit = "secs"
value = 1
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ core_rpc_url = ""
core_rpc_port = 18332
core_rpc_user = ""
core_rpc_pass = ""
# Timeout used for JDS mempool update
[mempool_update_timeout]
unit = "secs"
value = 1

0 comments on commit cd3ab95

Please sign in to comment.